1use itertools::multiunzip;
2use serde_json::json;
3
4use crate::prelude::*;
5
6#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
7#[cfg_attr(feature = "ts_rs", derive(TS))]
8pub struct UserMarketingConsent {
9 pub id: Uuid,
10 pub course_id: Uuid,
11 pub course_language_group_id: Uuid,
12 pub user_id: Uuid,
13 pub user_mailchimp_id: Option<String>,
14 pub consent: bool,
15 pub email_subscription_in_mailchimp: Option<String>,
16 pub created_at: DateTime<Utc>,
17 pub updated_at: DateTime<Utc>,
18 pub deleted_at: Option<DateTime<Utc>>,
19 pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
20}
21
22#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
23#[cfg_attr(feature = "ts_rs", derive(TS))]
24pub struct UserMarketingConsentWithDetails {
25 pub id: Uuid,
26 pub course_id: Uuid,
27 pub course_language_group_id: Uuid,
28 pub user_id: Uuid,
29 pub user_mailchimp_id: Option<String>,
30 pub consent: bool,
31 pub email_subscription_in_mailchimp: Option<String>,
32 pub created_at: DateTime<Utc>,
33 pub updated_at: DateTime<Utc>,
34 pub deleted_at: Option<DateTime<Utc>>,
35 pub synced_to_mailchimp_at: Option<DateTime<Utc>>,
36 pub first_name: Option<String>,
37 pub last_name: Option<String>,
38 pub email: String,
39 pub course_name: String,
40 pub locale: Option<String>,
41 pub completed_course_at: Option<DateTime<Utc>>,
42 pub research_consent: Option<bool>,
43 pub country: Option<String>,
44}
45
46#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
47#[cfg_attr(feature = "ts_rs", derive(TS))]
48pub struct UserEmailSubscription {
49 pub user_id: Uuid,
50 pub email: String,
51 pub email_subscription_in_mailchimp: Option<String>,
52 pub user_mailchimp_id: Option<String>,
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
56#[cfg_attr(feature = "ts_rs", derive(TS))]
57pub struct MarketingMailingListAccessToken {
58 pub id: Uuid,
59 pub course_id: Uuid,
60 pub mailchimp_mailing_list_id: String,
61 pub course_language_group_id: Uuid,
62 pub server_prefix: String,
63 pub access_token: String,
64 pub created_at: DateTime<Utc>,
65 pub updated_at: DateTime<Utc>,
66 pub deleted_at: Option<DateTime<Utc>>,
67}
68
69#[derive(Debug, FromRow)]
70pub struct MailchimpCourseTag {
71 pub id: Uuid,
72 pub marketing_mailing_list_access_token_id: Uuid,
73 pub course_language_group_id: Uuid,
74 pub tag_name: String,
75 pub tag_id: String,
76 pub created_at: DateTime<Utc>,
77 pub updated_at: DateTime<Utc>,
78 pub deleted_at: Option<DateTime<Utc>>,
79}
80
81pub async fn upsert_marketing_consent(
82 conn: &mut PgConnection,
83 course_id: Uuid,
84 course_language_group_id: Uuid,
85 user_id: &Uuid,
86 email_subscription: &str,
87 marketing_consent: bool,
88) -> sqlx::Result<Uuid> {
89 let result = sqlx::query!(
90 r#"
91 INSERT INTO user_marketing_consents (user_id, course_id, course_language_group_id, consent, email_subscription_in_mailchimp)
92 VALUES ($1, $2, $3, $4, $5)
93 ON CONFLICT (user_id, course_language_group_id)
94 DO UPDATE
95 SET
96 consent = $4,
97 email_subscription_in_mailchimp = $5
98 RETURNING id
99 "#,
100 user_id,
101 course_id,
102 course_language_group_id,
103 marketing_consent,
104 email_subscription
105 )
106 .fetch_one(conn)
107 .await?;
108
109 Ok(result.id)
110}
111
112pub async fn fetch_user_marketing_consent(
113 conn: &mut PgConnection,
114 course_id: Uuid,
115 user_id: &Uuid,
116) -> sqlx::Result<UserMarketingConsent> {
117 let result = sqlx::query_as!(
118 UserMarketingConsent,
119 "
120 SELECT *
121 FROM user_marketing_consents
122 WHERE user_id = $1 AND course_id = $2
123 ",
124 user_id,
125 course_id,
126 )
127 .fetch_one(conn)
128 .await?;
129
130 Ok(result)
131}
132
133pub async fn fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
135 conn: &mut PgConnection,
136 course_language_group_id: Uuid,
137) -> sqlx::Result<Vec<UserMarketingConsentWithDetails>> {
138 let result = sqlx::query_as!(
139 UserMarketingConsentWithDetails,
140 "
141 SELECT
142 umc.id,
143 umc.course_id,
144 umc.course_language_group_id,
145 umc.user_id,
146 umc.user_mailchimp_id,
147 umc.consent,
148 umc.email_subscription_in_mailchimp,
149 umc.created_at,
150 umc.updated_at,
151 umc.deleted_at,
152 umc.synced_to_mailchimp_at,
153 u.first_name AS first_name,
154 u.last_name AS last_name,
155 u.country AS country,
156 u.email AS email,
157 c.name AS course_name,
158 c.language_code AS locale,
159 CASE WHEN cmc.passed IS NOT NULL THEN cmc.completion_date ELSE NULL END AS completed_course_at,
160 COALESCE(csfa.research_consent, urc.research_consent) AS research_consent
161 FROM user_marketing_consents AS umc
162 JOIN user_details AS u ON u.user_id = umc.user_id
163 JOIN courses AS c ON c.id = umc.course_id
164 LEFT JOIN course_module_completions AS cmc
165 ON cmc.user_id = umc.user_id AND cmc.course_id = umc.course_id
166 LEFT JOIN course_specific_consent_form_answers AS csfa
167 ON csfa.course_id = umc.course_id AND csfa.user_id = umc.user_id
168 LEFT JOIN user_research_consents AS urc
169 ON urc.user_id = umc.user_id
170 LEFT JOIN mailchimp_course_tags AS tags
171 ON tags.course_language_group_id = umc.course_language_group_id
172 WHERE umc.course_language_group_id = $1
173 AND (
174 umc.synced_to_mailchimp_at IS NULL
175 OR umc.synced_to_mailchimp_at < umc.updated_at
176 OR csfa.updated_at > umc.synced_to_mailchimp_at
177 OR urc.updated_at > umc.synced_to_mailchimp_at
178 OR cmc.updated_at > umc.synced_to_mailchimp_at
179 OR u.updated_at > umc.synced_to_mailchimp_at
180 OR EXISTS (
181 SELECT 1
182 FROM mailchimp_course_tags
183 WHERE mailchimp_course_tags.course_language_group_id = umc.course_language_group_id
184 AND mailchimp_course_tags.updated_at > umc.synced_to_mailchimp_at
185 AND deleted_at IS NULL
186 )
187 )
188 ",
189 course_language_group_id
190 )
191 .fetch_all(conn)
192 .await?;
193
194 Ok(result)
195}
196
197pub async fn fetch_all_unsynced_updated_emails(
199 conn: &mut PgConnection,
200 course_language_group_id: Uuid,
201) -> sqlx::Result<Vec<UserEmailSubscription>> {
202 let result = sqlx::query_as!(
203 UserEmailSubscription,
204 "
205 SELECT
206 umc.user_id,
207 u.email AS email,
208 umc.email_subscription_in_mailchimp,
209 umc.user_mailchimp_id
210 FROM user_marketing_consents AS umc
211 JOIN user_details AS u ON u.user_id = umc.user_id
212 WHERE umc.course_language_group_id = $1
213 AND umc.synced_to_mailchimp_at < u.updated_at
214 ",
215 course_language_group_id
216 )
217 .fetch_all(conn)
218 .await?;
219
220 Ok(result)
221}
222
223pub async fn update_synced_to_mailchimp_at_to_all_synced_users(
225 conn: &mut PgConnection,
226 ids: &[Uuid],
227) -> ModelResult<()> {
228 sqlx::query!(
229 "
230UPDATE user_marketing_consents
231SET synced_to_mailchimp_at = now()
232WHERE user_id IN (
233 SELECT UNNEST($1::uuid [])
234 )
235",
236 &ids
237 )
238 .execute(conn)
239 .await?;
240 Ok(())
241}
242
243pub async fn update_user_mailchimp_id_at_to_all_synced_users(
245 pool: &mut PgConnection,
246 user_contact_pairs: Vec<(String, String)>,
247) -> ModelResult<()> {
248 let (user_ids_raw, user_mailchimp_ids): (Vec<_>, Vec<_>) =
249 user_contact_pairs.into_iter().unzip();
250
251 let user_ids: Vec<Uuid> = user_ids_raw
253 .into_iter()
254 .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
255 .collect();
256
257 sqlx::query!(
258 "
259UPDATE user_marketing_consents
260SET user_mailchimp_id = updated_data.user_mailchimp_id
261FROM (
262 SELECT UNNEST($1::uuid[]) AS user_id, UNNEST($2::text[]) AS user_mailchimp_id
263) AS updated_data
264WHERE user_marketing_consents.user_id = updated_data.user_id
265",
266 &user_ids,
267 &user_mailchimp_ids
268 )
269 .execute(pool)
270 .await?;
271 Ok(())
272}
273
274pub async fn update_unsubscribed_users_from_mailchimp_in_bulk(
276 conn: &mut PgConnection,
277 mailchimp_data: Vec<(String, String, String, String)>,
278) -> anyhow::Result<()> {
279 let (
280 user_ids_raw,
281 timestamps_raw,
282 course_language_group_ids_raw,
283 email_subscriptions_in_mailchimp,
284 ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>) = multiunzip(mailchimp_data);
285
286 let user_ids: Vec<Uuid> = user_ids_raw
287 .into_iter()
288 .filter_map(|user_id| Uuid::parse_str(&user_id).ok())
289 .collect();
290
291 let timestamps: Vec<DateTime<Utc>> = timestamps_raw
292 .into_iter()
293 .filter_map(|ts| {
294 DateTime::parse_from_rfc3339(&ts)
295 .ok()
296 .map(|dt| dt.with_timezone(&Utc))
297 })
298 .collect();
299
300 let course_language_group_ids: Vec<Uuid> = course_language_group_ids_raw
301 .into_iter()
302 .filter_map(|lang_id| Uuid::parse_str(&lang_id).ok())
303 .collect();
304
305 sqlx::query!(
306 "
307 UPDATE user_marketing_consents
308 SET consent = false,
309 email_subscription_in_mailchimp = updated_data.email_subscription_in_mailchimp,
310 synced_to_mailchimp_at = updated_data.last_updated
311 FROM (
312 SELECT UNNEST($1::Uuid[]) AS user_id,
313 UNNEST($2::timestamptz[]) AS last_updated,
314 UNNEST($3::Uuid[]) AS course_language_group_id,
315 UNNEST($4::text[]) AS email_subscription_in_mailchimp
316
317 ) AS updated_data
318 WHERE user_marketing_consents.user_id = updated_data.user_id
319 AND user_marketing_consents.consent = true
320 AND user_marketing_consents.synced_to_mailchimp_at < updated_data.last_updated
321 AND user_marketing_consents.course_language_group_id = updated_data.course_language_group_id
322 ",
323 &user_ids,
324 ×tamps,
325 &course_language_group_ids,
326 &email_subscriptions_in_mailchimp
327 )
328 .execute(conn)
329 .await?;
330 Ok(())
331}
332
333pub async fn fetch_all_marketing_mailing_list_access_tokens(
334 conn: &mut PgConnection,
335) -> sqlx::Result<Vec<MarketingMailingListAccessToken>> {
336 let results = sqlx::query_as!(
337 MarketingMailingListAccessToken,
338 "
339 SELECT
340 id,
341 course_id,
342 course_language_group_id,
343 server_prefix,
344 access_token,
345 mailchimp_mailing_list_id,
346 created_at,
347 updated_at,
348 deleted_at
349 FROM marketing_mailing_list_access_tokens
350 "
351 )
352 .fetch_all(conn)
353 .await?;
354
355 Ok(results)
356}
357
358pub async fn fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
359 conn: &mut PgConnection,
360 course_language_group_id: Uuid,
361 marketing_mailing_access_token_id: Uuid,
362) -> sqlx::Result<Vec<serde_json::Value>> {
363 let results = sqlx::query!(
364 "
365 SELECT
366 tag_name,
367 tag_id
368 FROM mailchimp_course_tags
369 WHERE course_language_group_id = $1
370 AND marketing_mailing_list_access_token_id = $2
371 AND deleted_at IS NULL
372 ",
373 course_language_group_id,
374 marketing_mailing_access_token_id
375 )
376 .fetch_all(conn)
377 .await?;
378
379 let tag_objects: Vec<serde_json::Value> = results
380 .into_iter()
381 .filter_map(|row| {
382 let tag_name = row.tag_name.trim();
383 let tag_id = row.tag_id.trim();
384
385 if tag_name.is_empty() {
386 return None;
387 }
388
389 Some(json!({
390 "name": tag_name,
391 "id": tag_id,
392 "status": "active"
393 }))
394 })
395 .collect();
396
397 Ok(tag_objects)
398}
399
400pub async fn upsert_tag(
401 conn: &mut PgConnection,
402 course_language_group_id: Uuid,
403 marketing_mailing_access_token_id: Uuid,
404 tag_id: String,
405 tag_name: String,
406) -> sqlx::Result<()> {
407 sqlx::query!(
408 "
409 INSERT INTO mailchimp_course_tags (
410 tag_name,
411 tag_id,
412 course_language_group_id,
413 marketing_mailing_list_access_token_id
414 )
415 VALUES ($1, $2, $3, $4)
416 ON CONFLICT (course_language_group_id, tag_name)
417 DO UPDATE
418 SET
419 tag_name = EXCLUDED.tag_name
420 ",
421 tag_name,
422 tag_id,
423 course_language_group_id,
424 marketing_mailing_access_token_id,
425 )
426 .execute(&mut *conn)
427 .await?;
428 Ok(())
429}
430
431pub async fn delete_tag(
432 conn: &mut PgConnection,
433 deleted_tag_id: String,
434 course_language_group_id: Uuid,
435) -> sqlx::Result<()> {
436 sqlx::query!(
437 "
438 UPDATE mailchimp_course_tags
439 SET deleted_at = now()
440 WHERE tag_id = $1
441 AND course_language_group_id = $2
442 AND deleted_at IS NULL
443 ",
444 deleted_tag_id,
445 course_language_group_id
446 )
447 .execute(&mut *conn)
448 .await?;
449
450 Ok(())
451}