1use crate::config::program_config::ProgramConfig;
2use crate::prelude::*;
3use crate::setup_tracing;
4use dotenvy::dotenv;
5use headless_lms_models::marketing_consents::MarketingMailingListAccessToken;
6use headless_lms_models::marketing_consents::UserEmailSubscription;
7use headless_lms_models::marketing_consents::UserMarketingConsentWithDetails;
8use headless_lms_utils::http::REQWEST_CLIENT;
9use secrecy::ExposeSecret;
10use serde_json::json;
11use sqlx::{PgConnection, PgPool};
12use std::{
13 env,
14 time::{Duration, Instant},
15};
16use uuid::Uuid;
17
18mod batch_client;
19mod mailchimp_ops;
20mod policy_tags;
21
22use batch_client::MAX_MAILCHIMP_BATCH_SIZE;
23use mailchimp_ops::{MailchimpExecutor, MailchimpOperation};
24use policy_tags::sync_policy_tags_for_users;
25use reqwest::Method;
26
27#[derive(Debug, Deserialize)]
28struct MailchimpField {
29 field_id: String,
30 field_name: String,
31}
32
33#[derive(Debug)]
34struct FieldSchema {
35 tag: &'static str,
36 name: &'static str,
37 default_value: &'static str,
38}
39
40const REQUIRED_FIELDS: &[FieldSchema] = &[
41 FieldSchema {
42 tag: "FNAME",
43 name: "First Name",
44 default_value: "",
45 },
46 FieldSchema {
47 tag: "LNAME",
48 name: "Last Name",
49 default_value: "",
50 },
51 FieldSchema {
52 tag: "MARKETING",
53 name: "Accepts Marketing",
54 default_value: "disallowed",
55 },
56 FieldSchema {
57 tag: "LOCALE",
58 name: "Locale",
59 default_value: "en",
60 },
61 FieldSchema {
62 tag: "GRADUATED",
63 name: "Graduated",
64 default_value: "",
65 },
66 FieldSchema {
67 tag: "COURSEID",
68 name: "Course ID",
69 default_value: "",
70 },
71 FieldSchema {
72 tag: "LANGGRPID",
73 name: "Course language Group ID",
74 default_value: "",
75 },
76 FieldSchema {
77 tag: "USERID",
78 name: "User ID",
79 default_value: "",
80 },
81 FieldSchema {
82 tag: "RESEARCH",
83 name: "Research consent",
84 default_value: "false",
85 },
86];
87
88const FIELDS_EXCLUDED_FROM_REMOVING: &[&str] = &["PHONE", "PACE", "COUNTRY", "MMERGE9"];
90const REMOVE_UNSUPPORTED_FIELDS: bool = false;
91const PROCESS_UNSUBSCRIBES_INTERVAL_SECS: u64 = 10_800;
92
93const SYNC_INTERVAL_SECS: u64 = 10;
94const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
95
96const BATCH_POLL_INTERVAL_SECS: u64 = 10;
97const BATCH_POLL_TIMEOUT_SECS: u64 = 300;
98
99#[derive(Debug)]
100struct SyncUser {
101 details: UserMarketingConsentWithDetails,
102 payload: serde_json::Value,
103}
104
105#[derive(Debug)]
106struct EmailSyncResult {
107 user_id: Uuid,
108 user_mailchimp_id: String,
109}
110
111pub async fn main() -> anyhow::Result<()> {
113 initialize_environment()?;
114
115 let config = initialize_configuration().await?;
116
117 let db_pool = initialize_database_pool(&config.database_url).await?;
118 let mut conn = db_pool.acquire().await?;
119
120 let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
121 let mut ticks = 0;
122
123 let access_tokens =
124 headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
125 &mut conn,
126 )
127 .await?;
128
129 for token in &access_tokens {
131 if let Err(e) = ensure_mailchimp_schema(
132 &token.mailchimp_mailing_list_id,
133 &token.server_prefix,
134 &token.access_token,
135 )
136 .await
137 {
138 error!(
139 "Failed to set up Mailchimp schema for list '{}': {:?}",
140 token.mailchimp_mailing_list_id, e
141 );
142 return Err(e);
143 }
144 }
145
146 info!("Starting mailchimp syncer (periodic reconciliation loop).");
147
148 let mut last_time_unsubscribes_processed = Instant::now();
149 let mut last_time_tags_synced = Instant::now();
150
151 loop {
152 interval.tick().await;
153 ticks += 1;
154
155 if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
156 ticks = 0;
157 info!("Still syncing.");
158 }
159 let mut process_unsubscribes = false;
160 if last_time_unsubscribes_processed.elapsed().as_secs()
161 >= PROCESS_UNSUBSCRIBES_INTERVAL_SECS
162 {
163 process_unsubscribes = true;
164 last_time_unsubscribes_processed = Instant::now();
165 };
166
167 if last_time_tags_synced.elapsed().as_secs() >= 3600 {
169 info!("Stage: tag catalog sync (keep local tag metadata aligned with Mailchimp).");
170 for token in &access_tokens {
171 if let Err(e) = sync_tags_from_mailchimp(
172 &mut conn,
173 &token.mailchimp_mailing_list_id,
174 &token.access_token,
175 &token.server_prefix,
176 token.id,
177 token.course_language_group_id,
178 )
179 .await
180 {
181 error!(
182 "Failed to sync tags for list '{}': {:?}",
183 token.mailchimp_mailing_list_id, e
184 );
185 }
186 }
187 last_time_tags_synced = Instant::now();
188 }
189
190 if let Err(e) = sync_contacts(&mut conn, &config, process_unsubscribes).await {
191 error!("Error during synchronization: {:?}", e);
192 if let Ok(sqlx::Error::Io(..)) = e.downcast::<sqlx::Error>() {
193 info!("syncer may have lost its connection to the db, trying to reconnect");
195 conn = db_pool.acquire().await?;
196 }
197 }
198 }
199}
200
201fn initialize_environment() -> anyhow::Result<()> {
203 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
205 dotenv().ok();
206 setup_tracing()?;
207 Ok(())
208}
209
210struct SyncerConfig {
212 database_url: String,
213}
214
215async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
217 let database_url = ProgramConfig::database_url_with_default();
218
219 Ok(SyncerConfig { database_url })
220}
221
222async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
224 PgPool::connect(database_url).await.map_err(|e| {
225 anyhow::anyhow!(
226 "Failed to connect to the database at {}: {:?}",
227 database_url,
228 e
229 )
230 })
231}
232
233async fn ensure_mailchimp_schema(
235 list_id: &str,
236 server_prefix: &str,
237 access_token: &DbSecret,
238) -> anyhow::Result<()> {
239 let existing_fields =
240 fetch_current_mailchimp_fields(list_id, server_prefix, access_token).await?;
241
242 if REMOVE_UNSUPPORTED_FIELDS {
243 for field in existing_fields.iter() {
245 if !REQUIRED_FIELDS
246 .iter()
247 .any(|r| r.tag == field.field_name.as_str())
248 && !FIELDS_EXCLUDED_FROM_REMOVING.contains(&field.field_name.as_str())
249 {
250 match remove_field_from_mailchimp(
251 list_id,
252 &field.field_id,
253 server_prefix,
254 access_token,
255 )
256 .await
257 {
258 Err(e) => {
259 warn!("Could not remove field '{}': {}", field.field_name, e);
260 }
261 _ => {
262 info!("Removed field '{}'", field.field_name);
263 }
264 }
265 }
266 }
267 }
268
269 for required_field in REQUIRED_FIELDS.iter() {
271 if !existing_fields
272 .iter()
273 .any(|f| f.field_name == required_field.tag)
274 {
275 match add_field_to_mailchimp(list_id, required_field, server_prefix, access_token).await
276 {
277 Err(e) => {
278 warn!(
279 "Failed to add required field '{}': {}",
280 required_field.name, e
281 );
282 }
283 _ => {
284 info!(
285 "Successfully added required field '{}'",
286 required_field.name
287 );
288 }
289 }
290 } else {
291 info!(
292 "Field '{}' already exists, skipping addition.",
293 required_field.name
294 );
295 }
296 }
297
298 Ok(())
299}
300
301async fn fetch_current_mailchimp_fields(
303 list_id: &str,
304 server_prefix: &str,
305 access_token: &DbSecret,
306) -> Result<Vec<MailchimpField>, anyhow::Error> {
307 let url = format!(
308 "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
309 server_prefix, list_id
310 );
311
312 let response = REQWEST_CLIENT
313 .get(&url)
314 .header(
315 "Authorization",
316 format!("apikey {}", access_token.expose_secret()),
317 )
318 .send()
319 .await?;
320
321 if response.status().is_success() {
322 let json = response.json::<serde_json::Value>().await?;
323
324 let fields: Vec<MailchimpField> = json["merge_fields"]
325 .as_array()
326 .unwrap_or(&vec![])
327 .iter()
328 .filter_map(|field| {
329 let field_id = field["merge_id"].as_u64();
330 let field_name = field["tag"].as_str();
331
332 if let (Some(field_id), Some(field_name)) = (field_id, field_name) {
333 Some(MailchimpField {
334 field_id: field_id.to_string(),
335 field_name: field_name.to_string(),
336 })
337 } else {
338 None
339 }
340 })
341 .collect();
342
343 Ok(fields)
344 } else {
345 let error_text = response.text().await?;
346 error!("Error fetching merge fields: {}", error_text);
347 Err(anyhow::anyhow!("Failed to fetch current Mailchimp fields."))
348 }
349}
350
351async fn add_field_to_mailchimp(
353 list_id: &str,
354 field_schema: &FieldSchema,
355 server_prefix: &str,
356 access_token: &DbSecret,
357) -> anyhow::Result<()> {
358 let url = format!(
359 "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
360 server_prefix, list_id
361 );
362
363 let body = json!({
364 "tag": field_schema.tag,
365 "name": field_schema.name,
366 "type": "text",
367 "default_value": field_schema.default_value,
368 });
369
370 let response = REQWEST_CLIENT
371 .post(&url)
372 .header(
373 "Authorization",
374 format!("apikey {}", access_token.expose_secret()),
375 )
376 .json(&body)
377 .send()
378 .await?;
379
380 if response.status().is_success() {
381 Ok(())
382 } else {
383 let status = response.status();
384 let error_text = response
385 .text()
386 .await
387 .unwrap_or_else(|_| "No additional error info.".to_string());
388 Err(anyhow::anyhow!(
389 "Failed to add field to Mailchimp. Status: {}. Error: {}",
390 status,
391 error_text
392 ))
393 }
394}
395
396async fn remove_field_from_mailchimp(
398 list_id: &str,
399 field_id: &str,
400 server_prefix: &str,
401 access_token: &DbSecret,
402) -> anyhow::Result<()> {
403 let url = format!(
404 "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields/{}",
405 server_prefix, list_id, field_id
406 );
407
408 let response = REQWEST_CLIENT
409 .delete(&url)
410 .header(
411 "Authorization",
412 format!("apikey {}", access_token.expose_secret()),
413 )
414 .send()
415 .await?;
416
417 if response.status().is_success() {
418 Ok(())
419 } else {
420 let status = response.status();
421 let error_text = response
422 .text()
423 .await
424 .unwrap_or_else(|_| "No additional error info.".to_string());
425 Err(anyhow::anyhow!(
426 "Failed to remove field from Mailchimp. Status: {}. Error: {}",
427 status,
428 error_text
429 ))
430 }
431}
432
433pub async fn sync_tags_from_mailchimp(
435 conn: &mut PgConnection,
436 list_id: &str,
437 access_token: &DbSecret,
438 server_prefix: &str,
439 marketing_mailing_list_access_token_id: Uuid,
440 course_language_group_id: Uuid,
441) -> anyhow::Result<()> {
442 let url = format!(
443 "https://{}.api.mailchimp.com/3.0/lists/{}/tag-search",
444 server_prefix, list_id
445 );
446
447 let response = REQWEST_CLIENT
448 .get(&url)
449 .header(
450 "Authorization",
451 format!("apikey {}", access_token.expose_secret()),
452 )
453 .send()
454 .await?;
455
456 let response_json = response.json::<serde_json::Value>().await?;
458
459 let mailchimp_tags = match response_json.get("tags") {
460 Some(tags) if tags.is_array() => tags
461 .as_array()
462 .ok_or_else(|| anyhow::anyhow!("tags field is not an array despite is_array() check"))?
463 .iter()
464 .filter_map(|tag| {
465 let name = tag.get("name")?.as_str()?.to_string();
466
467 let id = match tag.get("id") {
468 Some(serde_json::Value::Number(num)) => num.to_string(),
469 Some(serde_json::Value::String(str_id)) => str_id.clone(),
470 _ => return None,
471 };
472
473 Some((id, name))
474 })
475 .collect::<Vec<(String, String)>>(),
476 _ => {
477 warn!("No tags found for list '{}', skipping sync.", list_id);
478 return Ok(());
479 }
480 };
481
482 let db_tags = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
484 conn,
485 course_language_group_id,
486 marketing_mailing_list_access_token_id,
487 )
488 .await?;
489
490 for (tag_id, tag_name) in &mailchimp_tags {
492 if let Some(db_tag) = db_tags.iter().find(|db_tag| {
493 db_tag.get("id").and_then(|v| v.as_str()).map(|v| v.trim()) == Some(tag_id.trim())
494 }) {
495 let db_tag_name = db_tag
496 .get("tag_name")
497 .and_then(|v| v.as_str())
498 .unwrap_or_default();
499 if db_tag_name != tag_name {
500 headless_lms_models::marketing_consents::upsert_tag(
501 conn,
502 course_language_group_id,
503 marketing_mailing_list_access_token_id,
504 tag_id.clone(),
505 tag_name.clone(),
506 )
507 .await?;
508 }
509 }
510 }
511
512 for db_tag in db_tags.iter() {
514 let db_tag_id = db_tag
515 .get("id")
516 .and_then(|v| v.as_str())
517 .map(|v| v.trim().to_string())
518 .unwrap_or_default();
519
520 if !mailchimp_tags
522 .iter()
523 .any(|(tag_id, _)| tag_id.trim() == db_tag_id.trim())
524 {
525 if db_tag_id.is_empty() {
526 warn!("Skipping tag deletion due to missing ID: {:?}", db_tag);
527 continue;
528 }
529 headless_lms_models::marketing_consents::delete_tag(
530 conn,
531 db_tag_id.clone(),
532 course_language_group_id,
533 )
534 .await?;
535 }
536 }
537
538 Ok(())
539}
540
541async fn sync_contacts(
544 conn: &mut PgConnection,
545 _config: &SyncerConfig,
546 process_unsubscribes: bool,
547) -> anyhow::Result<()> {
548 let access_tokens =
549 headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
550 conn,
551 )
552 .await?;
553
554 let mut successfully_synced_user_ids = Vec::new();
555
556 for token in access_tokens {
558 let course_language_group_slug =
559 match headless_lms_models::course_language_groups::get_slug_by_id(
560 conn,
561 token.course_language_group_id,
562 )
563 .await
564 {
565 Ok(Some(s)) => Some(s),
566 Ok(None) => None,
567 Err(e) => {
568 error!(
569 course_language_group_id = %token.course_language_group_id,
570 "Failed to get course language group slug: {:?}",
571 e
572 );
573 return Err(e.into());
574 }
575 };
576
577 if process_unsubscribes {
579 info!(
580 "Stage: unsubscribe sync (apply Mailchimp compliance/unsubscribe changes locally)."
581 );
582 let mailchimp_data = fetch_unsubscribed_users_from_mailchimp_in_chunks(
583 &token.mailchimp_mailing_list_id,
584 &token.server_prefix,
585 &token.access_token,
586 1000,
587 )
588 .await?;
589
590 info!(
591 "Processing Mailchimp data for list: {}",
592 token.mailchimp_mailing_list_id
593 );
594
595 process_unsubscribed_users_from_mailchimp(conn, mailchimp_data).await?;
596 }
597
598 let users_with_unsynced_emails =
600 headless_lms_models::marketing_consents::fetch_all_unsynced_updated_emails(
601 conn,
602 token.course_language_group_id,
603 )
604 .await?;
605
606 info!(
607 "Stage: email updates (ensure member identifiers stay correct). Found {} unsynced user email(s) for course language group: {}",
608 users_with_unsynced_emails.len(),
609 token.course_language_group_id
610 );
611
612 if !users_with_unsynced_emails.is_empty() {
613 let email_sync_results = update_emails_in_mailchimp(
614 users_with_unsynced_emails,
615 &token.mailchimp_mailing_list_id,
616 &token.server_prefix,
617 &token.access_token,
618 )
619 .await?;
620
621 let email_synced_user_ids: Vec<Uuid> =
622 email_sync_results.iter().map(|r| r.user_id).collect();
623 successfully_synced_user_ids.extend(email_synced_user_ids.clone());
624
625 if !email_sync_results.is_empty() {
626 let mailchimp_id_by_user: std::collections::HashMap<Uuid, String> =
627 email_sync_results
628 .iter()
629 .map(|r| (r.user_id, r.user_mailchimp_id.clone()))
630 .collect();
631 let user_details =
632 headless_lms_models::marketing_consents::fetch_user_marketing_consents_with_details_by_user_ids(
633 conn,
634 token.course_language_group_id,
635 &email_synced_user_ids,
636 )
637 .await?;
638 match sync_policy_tags_for_users(
639 &token,
640 &user_details,
641 &mailchimp_id_by_user,
642 Duration::from_secs(BATCH_POLL_TIMEOUT_SECS),
643 Duration::from_secs(BATCH_POLL_INTERVAL_SECS),
644 )
645 .await
646 {
647 Ok(results) => {
648 let (ok, failed): (Vec<_>, Vec<_>) =
649 results.into_iter().partition(|r| r.success);
650 if !failed.is_empty() {
651 let sample: Vec<String> = failed
652 .iter()
653 .take(5)
654 .map(|r| {
655 format!(
656 "{}: {}",
657 r.user_id,
658 r.error.as_deref().unwrap_or("unknown error")
659 )
660 })
661 .collect();
662 warn!(
663 "Policy tag sync after email updates for list '{}' had {} failure(s) out of {}. Sample: {:?}",
664 token.mailchimp_mailing_list_id,
665 failed.len(),
666 ok.len() + failed.len(),
667 sample
668 );
669 }
670 }
671 Err(e) => {
672 error!(
673 "Failed to sync policy tags after email updates for list '{}': {:?}",
674 token.mailchimp_mailing_list_id, e
675 );
676 }
677 }
678 }
679 }
680
681 let tag_objects = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(conn, token.course_language_group_id, token.id).await?;
682
683 let unsynced_users_details =
685 headless_lms_models::marketing_consents::fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
686 conn,
687 token.course_language_group_id,
688 )
689 .await?;
690
691 info!(
692 "Stage: member upsert (merge fields + consent). Found {} unsynced user consent(s) for course language group: {}",
693 unsynced_users_details.len(),
694 token.course_language_group_id
695 );
696
697 if !unsynced_users_details.is_empty() {
698 let consent_synced_user_ids =
699 send_users_to_mailchimp(conn, &token, &unsynced_users_details, tag_objects).await?;
700
701 if let Some(ref slug) = course_language_group_slug
702 && !consent_synced_user_ids.is_empty()
703 {
704 let mailchimp_id_mapping =
705 headless_lms_models::marketing_consents::fetch_user_mailchimp_id_mapping(
706 conn,
707 token.course_language_group_id,
708 &consent_synced_user_ids,
709 )
710 .await?;
711 if let Err(e) = sync_completed_tag_for_members(
712 &unsynced_users_details,
713 &consent_synced_user_ids,
714 &mailchimp_id_mapping,
715 slug,
716 &token,
717 )
718 .await
719 {
720 error!(
721 "Failed to sync completed tag for list '{}': {:?}",
722 token.mailchimp_mailing_list_id, e
723 );
724 }
725 }
726
727 successfully_synced_user_ids.extend(consent_synced_user_ids);
729 }
730 }
731
732 if !successfully_synced_user_ids.is_empty() {
734 match headless_lms_models::marketing_consents::update_synced_to_mailchimp_at_to_all_synced_users(
735 conn,
736 &successfully_synced_user_ids,
737 )
738 .await
739 {
740 Ok(_) => {
741 info!(
742 "Stage: mark synced (avoid repeat work). Successfully updated synced status for {} users.",
743 successfully_synced_user_ids.len()
744 );
745 }
746 Err(e) => {
747 error!(
748 "Failed to update synced status for {} users: {:?}",
749 successfully_synced_user_ids.len(),
750 e
751 );
752 }
753 }
754 }
755
756 Ok(())
757}
758
759pub async fn send_users_to_mailchimp(
761 conn: &mut PgConnection,
762 token: &MarketingMailingListAccessToken,
763 users_details: &[UserMarketingConsentWithDetails],
764 tag_objects: Vec<serde_json::Value>,
765) -> anyhow::Result<Vec<Uuid>> {
766 let mut users_to_sync = vec![];
767 let mut sent_user_ids = Vec::new();
768 let mut successfully_synced_user_ids = Vec::new();
769 let mut user_id_contact_id_pairs = Vec::new();
770
771 for user in users_details {
773 if let Some(ref subscription) = user.email_subscription_in_mailchimp
775 && subscription == "subscribed"
776 {
777 sent_user_ids.push(user.user_id);
778 let user_details = json!({
779 "email_address": user.email,
780 "status": user.email_subscription_in_mailchimp,
781 "merge_fields": {
782 "FNAME": user.first_name.clone().unwrap_or("".to_string()),
783 "LNAME": user.last_name.clone().unwrap_or("".to_string()),
784 "MARKETING": if user.consent { "allowed" } else { "disallowed" },
785 "LOCALE": user.locale,
786 "GRADUATED": user.completed_course_at.map(|cca| cca.to_rfc3339()).unwrap_or("".to_string()),
787 "USERID": user.user_id,
788 "COURSEID": user.course_id,
789 "LANGGRPID": user.course_language_group_id,
790 "RESEARCH" : if user.research_consent.unwrap_or(false) { "allowed" } else { "disallowed" },
791 "COUNTRY" : user.country.clone().unwrap_or("".to_string()),
792 },
793 "tags": tag_objects.iter().map(|tag| tag["name"].clone()).collect::<Vec<_>>()
794 });
795 users_to_sync.push(SyncUser {
796 details: user.clone(),
797 payload: user_details,
798 });
799 }
800 }
801
802 if users_to_sync.is_empty() {
803 info!("No new users to sync.");
804 return Ok(vec![]);
805 }
806
807 let url = format!(
808 "https://{}.api.mailchimp.com/3.0/lists/{}",
809 token.server_prefix, token.mailchimp_mailing_list_id
810 );
811
812 let total_chunks = users_to_sync.len().div_ceil(MAX_MAILCHIMP_BATCH_SIZE);
813 info!(
814 "Syncing {} members to list '{}' in {} chunk(s)",
815 users_to_sync.len(),
816 token.mailchimp_mailing_list_id,
817 total_chunks
818 );
819
820 for (chunk_index, chunk) in users_to_sync.chunks(MAX_MAILCHIMP_BATCH_SIZE).enumerate() {
821 info!(
822 "Syncing users chunk {}/{} ({} members)",
823 chunk_index + 1,
824 total_chunks,
825 chunk.len()
826 );
827 let chunk_members: Vec<serde_json::Value> =
828 chunk.iter().map(|user| user.payload.clone()).collect();
829 let batch_request = json!({
830 "members": chunk_members,
831 "update_existing": true
832 });
833
834 let response = REQWEST_CLIENT
835 .post(&url)
836 .header("Content-Type", "application/json")
837 .header(
838 "Authorization",
839 format!("apikey {}", token.access_token.expose_secret()),
840 )
841 .json(&batch_request)
842 .send()
843 .await?;
844
845 if !response.status().is_success() {
846 let status = response.status();
847 let error_text = response.text().await?;
848 return Err(anyhow::anyhow!(
849 "Error syncing users to Mailchimp. Status: {}. Error: {}",
850 status,
851 error_text
852 ));
853 }
854
855 let response_data: serde_json::Value = response.json().await?;
856 let mut chunk_contact_count = 0;
857 let mut chunk_mailchimp_id_by_user: std::collections::HashMap<Uuid, String> =
858 std::collections::HashMap::new();
859 for user in chunk {
860 if let Some(ref mailchimp_id) = user.details.user_mailchimp_id {
861 chunk_mailchimp_id_by_user
862 .entry(user.details.user_id)
863 .or_insert_with(|| mailchimp_id.clone());
864 }
865 }
866 for key in &["new_members", "updated_members"] {
867 if let Some(members) = response_data[key].as_array() {
868 for member in members {
869 if let Some(user_id) = member["merge_fields"]["USERID"].as_str() {
870 if let Ok(uuid) = uuid::Uuid::parse_str(user_id) {
871 successfully_synced_user_ids.push(uuid);
872 }
873 if let Some(contact_id) = member["contact_id"].as_str() {
874 user_id_contact_id_pairs
875 .push((user_id.to_string(), contact_id.to_string()));
876 if let Ok(uuid) = uuid::Uuid::parse_str(user_id) {
877 chunk_mailchimp_id_by_user.insert(uuid, contact_id.to_string());
878 }
879 chunk_contact_count += 1;
880 }
881 }
882 }
883 }
884 }
885 if let Some(errors) = response_data["errors"].as_array()
886 && !errors.is_empty()
887 {
888 let sample: Vec<String> = errors
889 .iter()
890 .take(5)
891 .map(|e| {
892 let email = e
893 .get("email_address")
894 .and_then(|v| v.as_str())
895 .unwrap_or("?");
896 let msg = e.get("error").and_then(|v| v.as_str()).unwrap_or_else(|| {
897 e.get("message").and_then(|v| v.as_str()).unwrap_or("?")
898 });
899 format!("{}: {}", email, msg)
900 })
901 .collect();
902 warn!(
903 "Mailchimp batch subscribe chunk {}/{} returned {} error(s) (e.g. unsubscribed/rejected). Sample: {:?}",
904 chunk_index + 1,
905 total_chunks,
906 errors.len(),
907 sample
908 );
909 }
910 info!(
911 "Chunk {}/{}: {} contact_id(s) from new_members/updated_members",
912 chunk_index + 1,
913 total_chunks,
914 chunk_contact_count
915 );
916
917 let chunk_users: Vec<UserMarketingConsentWithDetails> =
918 chunk.iter().map(|user| user.details.clone()).collect();
919 match sync_policy_tags_for_users(
920 token,
921 &chunk_users,
922 &chunk_mailchimp_id_by_user,
923 Duration::from_secs(BATCH_POLL_TIMEOUT_SECS),
924 Duration::from_secs(BATCH_POLL_INTERVAL_SECS),
925 )
926 .await
927 {
928 Ok(results) => {
929 let (ok, failed): (Vec<_>, Vec<_>) = results.into_iter().partition(|r| r.success);
930 if !failed.is_empty() {
931 let sample: Vec<String> = failed
932 .iter()
933 .take(5)
934 .map(|r| {
935 format!(
936 "{}: {}",
937 r.user_id,
938 r.error.as_deref().unwrap_or("unknown error")
939 )
940 })
941 .collect();
942 warn!(
943 "Policy tag sync for list '{}' chunk {}/{} had {} failure(s) out of {}. Sample: {:?}",
944 token.mailchimp_mailing_list_id,
945 chunk_index + 1,
946 total_chunks,
947 failed.len(),
948 ok.len() + failed.len(),
949 sample
950 );
951 }
952 }
953 Err(e) => {
954 error!(
955 "Failed to sync policy tags for list '{}' chunk {}/{}: {:?}",
956 token.mailchimp_mailing_list_id,
957 chunk_index + 1,
958 total_chunks,
959 e
960 );
961 }
962 }
963 }
964
965 let got_contact_id_set: std::collections::HashSet<Uuid> = user_id_contact_id_pairs
966 .iter()
967 .filter_map(|(uid, _)| Uuid::parse_str(uid).ok())
968 .collect();
969 let no_contact_id_user_ids: Vec<Uuid> = sent_user_ids
970 .iter()
971 .filter(|id| !got_contact_id_set.contains(id))
972 .copied()
973 .collect();
974
975 if !no_contact_id_user_ids.is_empty() {
976 let sample_len = no_contact_id_user_ids.len().min(10);
977 warn!(
978 "Mailchimp did not return contact_id for {} member(s) (likely unsubscribed, removed, or rejected). Marking synced_to_mailchimp_at to stop retry. First {} user_id(s): {:?}",
979 no_contact_id_user_ids.len(),
980 sample_len,
981 &no_contact_id_user_ids[..sample_len]
982 );
983 if let Err(e) = headless_lms_models::marketing_consents::update_synced_to_mailchimp_at_to_all_synced_users(
984 conn,
985 &no_contact_id_user_ids,
986 )
987 .await
988 {
989 error!(
990 "Failed to update synced_to_mailchimp_at for no-contact_id users: {:?}",
991 e
992 );
993 }
994 }
995
996 info!(
997 "Batch subscribe list '{}': sent {} member(s), got {} contact_id(s){}",
998 token.mailchimp_mailing_list_id,
999 sent_user_ids.len(),
1000 user_id_contact_id_pairs.len(),
1001 if no_contact_id_user_ids.is_empty() {
1002 String::new()
1003 } else {
1004 format!(
1005 ", {} without contact_id (marked synced to stop retry)",
1006 no_contact_id_user_ids.len()
1007 )
1008 }
1009 );
1010
1011 if !user_id_contact_id_pairs.is_empty() {
1012 headless_lms_models::marketing_consents::update_user_mailchimp_id_at_to_all_synced_users(
1013 conn,
1014 user_id_contact_id_pairs,
1015 )
1016 .await?;
1017 }
1018
1019 Ok(successfully_synced_user_ids)
1020}
1021
1022async fn sync_completed_tag_for_members(
1024 users_details: &[UserMarketingConsentWithDetails],
1025 successfully_synced_user_ids: &[Uuid],
1026 mailchimp_id_by_user: &std::collections::HashMap<Uuid, String>,
1027 slug: &str,
1028 token: &MarketingMailingListAccessToken,
1029) -> anyhow::Result<()> {
1030 let tag_name = format!("{}-completed", slug);
1031 let success_set: std::collections::HashSet<_> = successfully_synced_user_ids.iter().collect();
1032
1033 let mut operations = Vec::new();
1034 for user in users_details {
1035 if !success_set.contains(&user.user_id) {
1036 continue;
1037 }
1038 let Some(user_mailchimp_id) = mailchimp_id_by_user.get(&user.user_id) else {
1039 continue;
1040 };
1041 let status = if user.completed_course_at.is_some() {
1042 "active"
1043 } else {
1044 "inactive"
1045 };
1046 operations.push(MailchimpOperation {
1047 method: Method::POST,
1048 path: format!(
1049 "/lists/{}/members/{}/tags",
1050 token.mailchimp_mailing_list_id, user_mailchimp_id
1051 ),
1052 body: Some(json!({
1053 "tags": [
1054 { "name": tag_name, "status": status }
1055 ]
1056 })),
1057 operation_id: Some(user.user_id.to_string()),
1058 });
1059 }
1060
1061 if operations.is_empty() {
1062 return Ok(());
1063 }
1064
1065 let ops_count = operations.len();
1066 info!(
1067 "Stage: completion tags (mark course completion). Preparing {} operation(s) for list '{}'",
1068 ops_count, token.mailchimp_mailing_list_id
1069 );
1070
1071 let timeout = Duration::from_secs(BATCH_POLL_TIMEOUT_SECS);
1072 let poll_interval = Duration::from_secs(BATCH_POLL_INTERVAL_SECS);
1073 let executor = MailchimpExecutor::new(timeout, poll_interval);
1074
1075 let start_time = Instant::now();
1076 let results = executor.execute(token, operations).await?;
1077 let failures: Vec<_> = results.iter().filter(|r| !r.is_success()).collect();
1078 if !failures.is_empty() {
1079 let sample: Vec<String> = failures
1080 .iter()
1081 .take(5)
1082 .map(|r| {
1083 format!(
1084 "{}: {}",
1085 r.operation_id.as_deref().unwrap_or("?"),
1086 r.error.as_deref().unwrap_or("unknown error")
1087 )
1088 })
1089 .collect();
1090 warn!(
1091 "Completion tag sync for list '{}' had {} failure(s) out of {}. Sample: {:?}",
1092 token.mailchimp_mailing_list_id,
1093 failures.len(),
1094 results.len(),
1095 sample
1096 );
1097 return Err(anyhow::anyhow!("completion tag sync failed"));
1098 }
1099
1100 info!(
1101 "Completed sync of {} completion tags for list '{}' in {:.2}s",
1102 ops_count,
1103 token.mailchimp_mailing_list_id,
1104 start_time.elapsed().as_secs_f64()
1105 );
1106
1107 Ok(())
1108}
1109
1110async fn update_emails_in_mailchimp(
1112 users: Vec<UserEmailSubscription>,
1113 list_id: &str,
1114 server_prefix: &str,
1115 access_token: &DbSecret,
1116) -> anyhow::Result<Vec<EmailSyncResult>> {
1117 let mut successfully_synced_users = Vec::new();
1118 let mut failed_user_ids = Vec::new();
1119
1120 for user in users {
1121 if let Some(ref user_mailchimp_id) = user.user_mailchimp_id {
1122 if let Some(ref status) = user.email_subscription_in_mailchimp
1123 && status != "subscribed"
1124 {
1125 continue; }
1127
1128 let url = format!(
1129 "https://{}.api.mailchimp.com/3.0/lists/{}/members/{}",
1130 server_prefix, list_id, user_mailchimp_id
1131 );
1132
1133 let body = serde_json::json!({
1135 "email_address": &user.email,
1136 "status": &user.email_subscription_in_mailchimp,
1137 });
1138
1139 let update_response = REQWEST_CLIENT
1141 .put(&url)
1142 .header(
1143 "Authorization",
1144 format!("apikey {}", access_token.expose_secret()),
1145 )
1146 .json(&body)
1147 .send()
1148 .await?;
1149
1150 if update_response.status().is_success() {
1151 successfully_synced_users.push(EmailSyncResult {
1152 user_id: user.user_id,
1153 user_mailchimp_id: user_mailchimp_id.clone(),
1154 });
1155 } else {
1156 failed_user_ids.push(user.user_id);
1157 }
1158 } else {
1159 continue;
1160 }
1161 }
1162
1163 if !failed_user_ids.is_empty() {
1164 info!("Failed to update the following users:");
1165 for user_id in &failed_user_ids {
1166 error!("User ID: {}", user_id);
1167 }
1168 }
1169
1170 Ok(successfully_synced_users)
1171}
1172
1173async fn fetch_unsubscribed_users_from_mailchimp_in_chunks(
1175 list_id: &str,
1176 server_prefix: &str,
1177 access_token: &DbSecret,
1178 chunk_size: usize,
1179) -> anyhow::Result<Vec<(String, String, String, String)>> {
1180 let mut all_data = Vec::new();
1181 let mut offset = 0;
1182
1183 loop {
1184 let url = format!(
1185 "https://{}.api.mailchimp.com/3.0/lists/{}/members?offset={}&count={}&fields=members.merge_fields,members.status,members.last_changed&status=unsubscribed,non-subscribed",
1186 server_prefix, list_id, offset, chunk_size
1187 );
1188
1189 let response = REQWEST_CLIENT
1190 .get(&url)
1191 .header(
1192 "Authorization",
1193 format!("apikey {}", access_token.expose_secret()),
1194 )
1195 .send()
1196 .await?
1197 .json::<serde_json::Value>()
1198 .await?;
1199
1200 let empty_vec = vec![];
1201 let members = response["members"].as_array().unwrap_or(&empty_vec);
1202 if members.is_empty() {
1203 break;
1204 }
1205
1206 for member in members {
1207 if let (Some(status), Some(last_changed), Some(merge_fields)) = (
1209 member["status"].as_str(),
1210 member["last_changed"].as_str(),
1211 member["merge_fields"].as_object(),
1212 ) {
1213 if let (Some(user_id), Some(language_group_id)) = (
1215 merge_fields.get("USERID").and_then(|v| v.as_str()),
1216 merge_fields.get("LANGGRPID").and_then(|v| v.as_str()),
1217 ) {
1218 if !user_id.is_empty() && !language_group_id.is_empty() {
1220 all_data.push((
1221 user_id.to_string(),
1222 last_changed.to_string(),
1223 language_group_id.to_string(),
1224 status.to_string(),
1225 ));
1226 }
1227 }
1228 }
1229 }
1230
1231 let total_items = response["total_items"].as_u64().unwrap_or(0) as usize;
1233 if offset + chunk_size >= total_items {
1234 break;
1235 }
1236
1237 offset += chunk_size;
1238 }
1239
1240 Ok(all_data)
1241}
1242
1243const BATCH_SIZE: usize = 1000;
1244
1245async fn process_unsubscribed_users_from_mailchimp(
1246 conn: &mut PgConnection,
1247 mailchimp_data: Vec<(String, String, String, String)>,
1248) -> anyhow::Result<()> {
1249 let total_records = mailchimp_data.len();
1250 let total_chunks = total_records.div_ceil(BATCH_SIZE);
1251
1252 for (chunk_num, chunk) in mailchimp_data.chunks(BATCH_SIZE).enumerate() {
1253 if chunk.is_empty() {
1254 continue;
1255 }
1256
1257 if let Err(e) = headless_lms_models::marketing_consents::update_unsubscribed_users_from_mailchimp_in_bulk(
1258 conn,
1259 chunk.to_vec(),
1260 )
1261 .await
1262 {
1263 error!(
1264 "Error while processing chunk {}/{}: {}",
1265 chunk_num + 1,
1266 total_chunks,
1267 e
1268 );
1269 }
1270 }
1271
1272 Ok(())
1273}