1use crate::prelude::*;
2use crate::setup_tracing;
3use dotenv::dotenv;
4use headless_lms_models::marketing_consents::MarketingMailingListAccessToken;
5use headless_lms_models::marketing_consents::UserEmailSubscription;
6use headless_lms_models::marketing_consents::UserMarketingConsentWithDetails;
7use headless_lms_utils::http::REQWEST_CLIENT;
8use serde_json::json;
9use sqlx::{PgConnection, PgPool};
10use std::{
11 env,
12 time::{Duration, Instant},
13};
14use uuid::Uuid;
15
16mod batch_client;
17mod mailchimp_ops;
18mod policy_tags;
19
20use batch_client::MAX_MAILCHIMP_BATCH_SIZE;
21use mailchimp_ops::{MailchimpExecutor, MailchimpOperation};
22use policy_tags::sync_policy_tags_for_users;
23use reqwest::Method;
24
25#[derive(Debug, Deserialize)]
26struct MailchimpField {
27 field_id: String,
28 field_name: String,
29}
30
31#[derive(Debug)]
32struct FieldSchema {
33 tag: &'static str,
34 name: &'static str,
35 default_value: &'static str,
36}
37
38const REQUIRED_FIELDS: &[FieldSchema] = &[
39 FieldSchema {
40 tag: "FNAME",
41 name: "First Name",
42 default_value: "",
43 },
44 FieldSchema {
45 tag: "LNAME",
46 name: "Last Name",
47 default_value: "",
48 },
49 FieldSchema {
50 tag: "MARKETING",
51 name: "Accepts Marketing",
52 default_value: "disallowed",
53 },
54 FieldSchema {
55 tag: "LOCALE",
56 name: "Locale",
57 default_value: "en",
58 },
59 FieldSchema {
60 tag: "GRADUATED",
61 name: "Graduated",
62 default_value: "",
63 },
64 FieldSchema {
65 tag: "COURSEID",
66 name: "Course ID",
67 default_value: "",
68 },
69 FieldSchema {
70 tag: "LANGGRPID",
71 name: "Course language Group ID",
72 default_value: "",
73 },
74 FieldSchema {
75 tag: "USERID",
76 name: "User ID",
77 default_value: "",
78 },
79 FieldSchema {
80 tag: "RESEARCH",
81 name: "Research consent",
82 default_value: "false",
83 },
84];
85
86const FIELDS_EXCLUDED_FROM_REMOVING: &[&str] = &["PHONE", "PACE", "COUNTRY", "MMERGE9"];
88const REMOVE_UNSUPPORTED_FIELDS: bool = false;
89const PROCESS_UNSUBSCRIBES_INTERVAL_SECS: u64 = 10_800;
90
91const SYNC_INTERVAL_SECS: u64 = 10;
92const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
93
94const BATCH_POLL_INTERVAL_SECS: u64 = 10;
95const BATCH_POLL_TIMEOUT_SECS: u64 = 300;
96
97#[derive(Debug)]
98struct SyncUser {
99 details: UserMarketingConsentWithDetails,
100 payload: serde_json::Value,
101}
102
103#[derive(Debug)]
104struct EmailSyncResult {
105 user_id: Uuid,
106 user_mailchimp_id: String,
107}
108
109pub async fn main() -> anyhow::Result<()> {
111 initialize_environment()?;
112
113 let config = initialize_configuration().await?;
114
115 let db_pool = initialize_database_pool(&config.database_url).await?;
116 let mut conn = db_pool.acquire().await?;
117
118 let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
119 let mut ticks = 0;
120
121 let access_tokens =
122 headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
123 &mut conn,
124 )
125 .await?;
126
127 for token in &access_tokens {
129 if let Err(e) = ensure_mailchimp_schema(
130 &token.mailchimp_mailing_list_id,
131 &token.server_prefix,
132 &token.access_token,
133 )
134 .await
135 {
136 error!(
137 "Failed to set up Mailchimp schema for list '{}': {:?}",
138 token.mailchimp_mailing_list_id, e
139 );
140 return Err(e);
141 }
142 }
143
144 info!("Starting mailchimp syncer (periodic reconciliation loop).");
145
146 let mut last_time_unsubscribes_processed = Instant::now();
147 let mut last_time_tags_synced = Instant::now();
148
149 loop {
150 interval.tick().await;
151 ticks += 1;
152
153 if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
154 ticks = 0;
155 info!("Still syncing.");
156 }
157 let mut process_unsubscribes = false;
158 if last_time_unsubscribes_processed.elapsed().as_secs()
159 >= PROCESS_UNSUBSCRIBES_INTERVAL_SECS
160 {
161 process_unsubscribes = true;
162 last_time_unsubscribes_processed = Instant::now();
163 };
164
165 if last_time_tags_synced.elapsed().as_secs() >= 3600 {
167 info!("Stage: tag catalog sync (keep local tag metadata aligned with Mailchimp).");
168 for token in &access_tokens {
169 if let Err(e) = sync_tags_from_mailchimp(
170 &mut conn,
171 &token.mailchimp_mailing_list_id,
172 &token.access_token,
173 &token.server_prefix,
174 token.id,
175 token.course_language_group_id,
176 )
177 .await
178 {
179 error!(
180 "Failed to sync tags for list '{}': {:?}",
181 token.mailchimp_mailing_list_id, e
182 );
183 }
184 }
185 last_time_tags_synced = Instant::now();
186 }
187
188 if let Err(e) = sync_contacts(&mut conn, &config, process_unsubscribes).await {
189 error!("Error during synchronization: {:?}", e);
190 if let Ok(sqlx::Error::Io(..)) = e.downcast::<sqlx::Error>() {
191 info!("syncer may have lost its connection to the db, trying to reconnect");
193 conn = db_pool.acquire().await?;
194 }
195 }
196 }
197}
198
199fn initialize_environment() -> anyhow::Result<()> {
201 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
203 dotenv().ok();
204 setup_tracing()?;
205 Ok(())
206}
207
208struct SyncerConfig {
210 database_url: String,
211}
212
213async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
215 let database_url = env::var("DATABASE_URL")
216 .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
217
218 Ok(SyncerConfig { database_url })
219}
220
221async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
223 PgPool::connect(database_url).await.map_err(|e| {
224 anyhow::anyhow!(
225 "Failed to connect to the database at {}: {:?}",
226 database_url,
227 e
228 )
229 })
230}
231
232async fn ensure_mailchimp_schema(
234 list_id: &str,
235 server_prefix: &str,
236 access_token: &str,
237) -> anyhow::Result<()> {
238 let existing_fields =
239 fetch_current_mailchimp_fields(list_id, server_prefix, access_token).await?;
240
241 if REMOVE_UNSUPPORTED_FIELDS {
242 for field in existing_fields.iter() {
244 if !REQUIRED_FIELDS
245 .iter()
246 .any(|r| r.tag == field.field_name.as_str())
247 && !FIELDS_EXCLUDED_FROM_REMOVING.contains(&field.field_name.as_str())
248 {
249 match remove_field_from_mailchimp(
250 list_id,
251 &field.field_id,
252 server_prefix,
253 access_token,
254 )
255 .await
256 {
257 Err(e) => {
258 warn!("Could not remove field '{}': {}", field.field_name, e);
259 }
260 _ => {
261 info!("Removed field '{}'", field.field_name);
262 }
263 }
264 }
265 }
266 }
267
268 for required_field in REQUIRED_FIELDS.iter() {
270 if !existing_fields
271 .iter()
272 .any(|f| f.field_name == required_field.tag)
273 {
274 match add_field_to_mailchimp(list_id, required_field, server_prefix, access_token).await
275 {
276 Err(e) => {
277 warn!(
278 "Failed to add required field '{}': {}",
279 required_field.name, e
280 );
281 }
282 _ => {
283 info!(
284 "Successfully added required field '{}'",
285 required_field.name
286 );
287 }
288 }
289 } else {
290 info!(
291 "Field '{}' already exists, skipping addition.",
292 required_field.name
293 );
294 }
295 }
296
297 Ok(())
298}
299
300async fn fetch_current_mailchimp_fields(
302 list_id: &str,
303 server_prefix: &str,
304 access_token: &str,
305) -> Result<Vec<MailchimpField>, anyhow::Error> {
306 let url = format!(
307 "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
308 server_prefix, list_id
309 );
310
311 let response = REQWEST_CLIENT
312 .get(&url)
313 .header("Authorization", format!("apikey {}", access_token))
314 .send()
315 .await?;
316
317 if response.status().is_success() {
318 let json = response.json::<serde_json::Value>().await?;
319
320 let fields: Vec<MailchimpField> = json["merge_fields"]
321 .as_array()
322 .unwrap_or(&vec![])
323 .iter()
324 .filter_map(|field| {
325 let field_id = field["merge_id"].as_u64();
326 let field_name = field["tag"].as_str();
327
328 if let (Some(field_id), Some(field_name)) = (field_id, field_name) {
329 Some(MailchimpField {
330 field_id: field_id.to_string(),
331 field_name: field_name.to_string(),
332 })
333 } else {
334 None
335 }
336 })
337 .collect();
338
339 Ok(fields)
340 } else {
341 let error_text = response.text().await?;
342 error!("Error fetching merge fields: {}", error_text);
343 Err(anyhow::anyhow!("Failed to fetch current Mailchimp fields."))
344 }
345}
346
347async fn add_field_to_mailchimp(
349 list_id: &str,
350 field_schema: &FieldSchema,
351 server_prefix: &str,
352 access_token: &str,
353) -> anyhow::Result<()> {
354 let url = format!(
355 "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
356 server_prefix, list_id
357 );
358
359 let body = json!({
360 "tag": field_schema.tag,
361 "name": field_schema.name,
362 "type": "text",
363 "default_value": field_schema.default_value,
364 });
365
366 let response = REQWEST_CLIENT
367 .post(&url)
368 .header("Authorization", format!("apikey {}", access_token))
369 .json(&body)
370 .send()
371 .await?;
372
373 if response.status().is_success() {
374 Ok(())
375 } else {
376 let status = response.status();
377 let error_text = response
378 .text()
379 .await
380 .unwrap_or_else(|_| "No additional error info.".to_string());
381 Err(anyhow::anyhow!(
382 "Failed to add field to Mailchimp. Status: {}. Error: {}",
383 status,
384 error_text
385 ))
386 }
387}
388
389async fn remove_field_from_mailchimp(
391 list_id: &str,
392 field_id: &str,
393 server_prefix: &str,
394 access_token: &str,
395) -> anyhow::Result<()> {
396 let url = format!(
397 "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields/{}",
398 server_prefix, list_id, field_id
399 );
400
401 let response = REQWEST_CLIENT
402 .delete(&url)
403 .header("Authorization", format!("apikey {}", access_token))
404 .send()
405 .await?;
406
407 if response.status().is_success() {
408 Ok(())
409 } else {
410 let status = response.status();
411 let error_text = response
412 .text()
413 .await
414 .unwrap_or_else(|_| "No additional error info.".to_string());
415 Err(anyhow::anyhow!(
416 "Failed to remove field from Mailchimp. Status: {}. Error: {}",
417 status,
418 error_text
419 ))
420 }
421}
422
423pub async fn sync_tags_from_mailchimp(
425 conn: &mut PgConnection,
426 list_id: &str,
427 access_token: &str,
428 server_prefix: &str,
429 marketing_mailing_list_access_token_id: Uuid,
430 course_language_group_id: Uuid,
431) -> anyhow::Result<()> {
432 let url = format!(
433 "https://{}.api.mailchimp.com/3.0/lists/{}/tag-search",
434 server_prefix, list_id
435 );
436
437 let response = REQWEST_CLIENT
438 .get(&url)
439 .header("Authorization", format!("apikey {}", access_token))
440 .send()
441 .await?;
442
443 let response_json = response.json::<serde_json::Value>().await?;
445
446 let mailchimp_tags = match response_json.get("tags") {
447 Some(tags) if tags.is_array() => tags
448 .as_array()
449 .ok_or_else(|| anyhow::anyhow!("tags field is not an array despite is_array() check"))?
450 .iter()
451 .filter_map(|tag| {
452 let name = tag.get("name")?.as_str()?.to_string();
453
454 let id = match tag.get("id") {
455 Some(serde_json::Value::Number(num)) => num.to_string(),
456 Some(serde_json::Value::String(str_id)) => str_id.clone(),
457 _ => return None,
458 };
459
460 Some((id, name))
461 })
462 .collect::<Vec<(String, String)>>(),
463 _ => {
464 warn!("No tags found for list '{}', skipping sync.", list_id);
465 return Ok(());
466 }
467 };
468
469 let db_tags = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
471 conn,
472 course_language_group_id,
473 marketing_mailing_list_access_token_id,
474 )
475 .await?;
476
477 for (tag_id, tag_name) in &mailchimp_tags {
479 if let Some(db_tag) = db_tags.iter().find(|db_tag| {
480 db_tag.get("id").and_then(|v| v.as_str()).map(|v| v.trim()) == Some(tag_id.trim())
481 }) {
482 let db_tag_name = db_tag
483 .get("tag_name")
484 .and_then(|v| v.as_str())
485 .unwrap_or_default();
486 if db_tag_name != tag_name {
487 headless_lms_models::marketing_consents::upsert_tag(
488 conn,
489 course_language_group_id,
490 marketing_mailing_list_access_token_id,
491 tag_id.clone(),
492 tag_name.clone(),
493 )
494 .await?;
495 }
496 }
497 }
498
499 for db_tag in db_tags.iter() {
501 let db_tag_id = db_tag
502 .get("id")
503 .and_then(|v| v.as_str())
504 .map(|v| v.trim().to_string())
505 .unwrap_or_default();
506
507 if !mailchimp_tags
509 .iter()
510 .any(|(tag_id, _)| tag_id.trim() == db_tag_id.trim())
511 {
512 if db_tag_id.is_empty() {
513 warn!("Skipping tag deletion due to missing ID: {:?}", db_tag);
514 continue;
515 }
516 headless_lms_models::marketing_consents::delete_tag(
517 conn,
518 db_tag_id.clone(),
519 course_language_group_id,
520 )
521 .await?;
522 }
523 }
524
525 Ok(())
526}
527
528async fn sync_contacts(
531 conn: &mut PgConnection,
532 _config: &SyncerConfig,
533 process_unsubscribes: bool,
534) -> anyhow::Result<()> {
535 let access_tokens =
536 headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
537 conn,
538 )
539 .await?;
540
541 let mut successfully_synced_user_ids = Vec::new();
542
543 for token in access_tokens {
545 let course_language_group_slug =
546 match headless_lms_models::course_language_groups::get_slug_by_id(
547 conn,
548 token.course_language_group_id,
549 )
550 .await
551 {
552 Ok(Some(s)) => Some(s),
553 Ok(None) => None,
554 Err(e) => {
555 error!(
556 course_language_group_id = %token.course_language_group_id,
557 "Failed to get course language group slug: {:?}",
558 e
559 );
560 return Err(e.into());
561 }
562 };
563
564 if process_unsubscribes {
566 info!(
567 "Stage: unsubscribe sync (apply Mailchimp compliance/unsubscribe changes locally)."
568 );
569 let mailchimp_data = fetch_unsubscribed_users_from_mailchimp_in_chunks(
570 &token.mailchimp_mailing_list_id,
571 &token.server_prefix,
572 &token.access_token,
573 1000,
574 )
575 .await?;
576
577 info!(
578 "Processing Mailchimp data for list: {}",
579 token.mailchimp_mailing_list_id
580 );
581
582 process_unsubscribed_users_from_mailchimp(conn, mailchimp_data).await?;
583 }
584
585 let users_with_unsynced_emails =
587 headless_lms_models::marketing_consents::fetch_all_unsynced_updated_emails(
588 conn,
589 token.course_language_group_id,
590 )
591 .await?;
592
593 info!(
594 "Stage: email updates (ensure member identifiers stay correct). Found {} unsynced user email(s) for course language group: {}",
595 users_with_unsynced_emails.len(),
596 token.course_language_group_id
597 );
598
599 if !users_with_unsynced_emails.is_empty() {
600 let email_sync_results = update_emails_in_mailchimp(
601 users_with_unsynced_emails,
602 &token.mailchimp_mailing_list_id,
603 &token.server_prefix,
604 &token.access_token,
605 )
606 .await?;
607
608 let email_synced_user_ids: Vec<Uuid> =
609 email_sync_results.iter().map(|r| r.user_id).collect();
610 successfully_synced_user_ids.extend(email_synced_user_ids.clone());
611
612 if !email_sync_results.is_empty() {
613 let mailchimp_id_by_user: std::collections::HashMap<Uuid, String> =
614 email_sync_results
615 .iter()
616 .map(|r| (r.user_id, r.user_mailchimp_id.clone()))
617 .collect();
618 let user_details =
619 headless_lms_models::marketing_consents::fetch_user_marketing_consents_with_details_by_user_ids(
620 conn,
621 token.course_language_group_id,
622 &email_synced_user_ids,
623 )
624 .await?;
625 match sync_policy_tags_for_users(
626 &token,
627 &user_details,
628 &mailchimp_id_by_user,
629 Duration::from_secs(BATCH_POLL_TIMEOUT_SECS),
630 Duration::from_secs(BATCH_POLL_INTERVAL_SECS),
631 )
632 .await
633 {
634 Ok(results) => {
635 let (ok, failed): (Vec<_>, Vec<_>) =
636 results.into_iter().partition(|r| r.success);
637 if !failed.is_empty() {
638 let sample: Vec<String> = failed
639 .iter()
640 .take(5)
641 .map(|r| {
642 format!(
643 "{}: {}",
644 r.user_id,
645 r.error.as_deref().unwrap_or("unknown error")
646 )
647 })
648 .collect();
649 warn!(
650 "Policy tag sync after email updates for list '{}' had {} failure(s) out of {}. Sample: {:?}",
651 token.mailchimp_mailing_list_id,
652 failed.len(),
653 ok.len() + failed.len(),
654 sample
655 );
656 }
657 }
658 Err(e) => {
659 error!(
660 "Failed to sync policy tags after email updates for list '{}': {:?}",
661 token.mailchimp_mailing_list_id, e
662 );
663 }
664 }
665 }
666 }
667
668 let tag_objects = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(conn, token.course_language_group_id, token.id).await?;
669
670 let unsynced_users_details =
672 headless_lms_models::marketing_consents::fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
673 conn,
674 token.course_language_group_id,
675 )
676 .await?;
677
678 info!(
679 "Stage: member upsert (merge fields + consent). Found {} unsynced user consent(s) for course language group: {}",
680 unsynced_users_details.len(),
681 token.course_language_group_id
682 );
683
684 if !unsynced_users_details.is_empty() {
685 let consent_synced_user_ids =
686 send_users_to_mailchimp(conn, &token, &unsynced_users_details, tag_objects).await?;
687
688 if let Some(ref slug) = course_language_group_slug
689 && !consent_synced_user_ids.is_empty()
690 {
691 let mailchimp_id_mapping =
692 headless_lms_models::marketing_consents::fetch_user_mailchimp_id_mapping(
693 conn,
694 token.course_language_group_id,
695 &consent_synced_user_ids,
696 )
697 .await?;
698 if let Err(e) = sync_completed_tag_for_members(
699 &unsynced_users_details,
700 &consent_synced_user_ids,
701 &mailchimp_id_mapping,
702 slug,
703 &token,
704 )
705 .await
706 {
707 error!(
708 "Failed to sync completed tag for list '{}': {:?}",
709 token.mailchimp_mailing_list_id, e
710 );
711 }
712 }
713
714 successfully_synced_user_ids.extend(consent_synced_user_ids);
716 }
717 }
718
719 if !successfully_synced_user_ids.is_empty() {
721 match headless_lms_models::marketing_consents::update_synced_to_mailchimp_at_to_all_synced_users(
722 conn,
723 &successfully_synced_user_ids,
724 )
725 .await
726 {
727 Ok(_) => {
728 info!(
729 "Stage: mark synced (avoid repeat work). Successfully updated synced status for {} users.",
730 successfully_synced_user_ids.len()
731 );
732 }
733 Err(e) => {
734 error!(
735 "Failed to update synced status for {} users: {:?}",
736 successfully_synced_user_ids.len(),
737 e
738 );
739 }
740 }
741 }
742
743 Ok(())
744}
745
746pub async fn send_users_to_mailchimp(
748 conn: &mut PgConnection,
749 token: &MarketingMailingListAccessToken,
750 users_details: &[UserMarketingConsentWithDetails],
751 tag_objects: Vec<serde_json::Value>,
752) -> anyhow::Result<Vec<Uuid>> {
753 let mut users_to_sync = vec![];
754 let mut sent_user_ids = Vec::new();
755 let mut successfully_synced_user_ids = Vec::new();
756 let mut user_id_contact_id_pairs = Vec::new();
757
758 for user in users_details {
760 if let Some(ref subscription) = user.email_subscription_in_mailchimp
762 && subscription == "subscribed"
763 {
764 sent_user_ids.push(user.user_id);
765 let user_details = json!({
766 "email_address": user.email,
767 "status": user.email_subscription_in_mailchimp,
768 "merge_fields": {
769 "FNAME": user.first_name.clone().unwrap_or("".to_string()),
770 "LNAME": user.last_name.clone().unwrap_or("".to_string()),
771 "MARKETING": if user.consent { "allowed" } else { "disallowed" },
772 "LOCALE": user.locale,
773 "GRADUATED": user.completed_course_at.map(|cca| cca.to_rfc3339()).unwrap_or("".to_string()),
774 "USERID": user.user_id,
775 "COURSEID": user.course_id,
776 "LANGGRPID": user.course_language_group_id,
777 "RESEARCH" : if user.research_consent.unwrap_or(false) { "allowed" } else { "disallowed" },
778 "COUNTRY" : user.country.clone().unwrap_or("".to_string()),
779 },
780 "tags": tag_objects.iter().map(|tag| tag["name"].clone()).collect::<Vec<_>>()
781 });
782 users_to_sync.push(SyncUser {
783 details: user.clone(),
784 payload: user_details,
785 });
786 }
787 }
788
789 if users_to_sync.is_empty() {
790 info!("No new users to sync.");
791 return Ok(vec![]);
792 }
793
794 let url = format!(
795 "https://{}.api.mailchimp.com/3.0/lists/{}",
796 token.server_prefix, token.mailchimp_mailing_list_id
797 );
798
799 let total_chunks = users_to_sync.len().div_ceil(MAX_MAILCHIMP_BATCH_SIZE);
800 info!(
801 "Syncing {} members to list '{}' in {} chunk(s)",
802 users_to_sync.len(),
803 token.mailchimp_mailing_list_id,
804 total_chunks
805 );
806
807 for (chunk_index, chunk) in users_to_sync.chunks(MAX_MAILCHIMP_BATCH_SIZE).enumerate() {
808 info!(
809 "Syncing users chunk {}/{} ({} members)",
810 chunk_index + 1,
811 total_chunks,
812 chunk.len()
813 );
814 let chunk_members: Vec<serde_json::Value> =
815 chunk.iter().map(|user| user.payload.clone()).collect();
816 let batch_request = json!({
817 "members": chunk_members,
818 "update_existing": true
819 });
820
821 let response = REQWEST_CLIENT
822 .post(&url)
823 .header("Content-Type", "application/json")
824 .header("Authorization", format!("apikey {}", token.access_token))
825 .json(&batch_request)
826 .send()
827 .await?;
828
829 if !response.status().is_success() {
830 let status = response.status();
831 let error_text = response.text().await?;
832 return Err(anyhow::anyhow!(
833 "Error syncing users to Mailchimp. Status: {}. Error: {}",
834 status,
835 error_text
836 ));
837 }
838
839 let response_data: serde_json::Value = response.json().await?;
840 let mut chunk_contact_count = 0;
841 let mut chunk_mailchimp_id_by_user: std::collections::HashMap<Uuid, String> =
842 std::collections::HashMap::new();
843 for user in chunk {
844 if let Some(ref mailchimp_id) = user.details.user_mailchimp_id {
845 chunk_mailchimp_id_by_user
846 .entry(user.details.user_id)
847 .or_insert_with(|| mailchimp_id.clone());
848 }
849 }
850 for key in &["new_members", "updated_members"] {
851 if let Some(members) = response_data[key].as_array() {
852 for member in members {
853 if let Some(user_id) = member["merge_fields"]["USERID"].as_str() {
854 if let Ok(uuid) = uuid::Uuid::parse_str(user_id) {
855 successfully_synced_user_ids.push(uuid);
856 }
857 if let Some(contact_id) = member["contact_id"].as_str() {
858 user_id_contact_id_pairs
859 .push((user_id.to_string(), contact_id.to_string()));
860 if let Ok(uuid) = uuid::Uuid::parse_str(user_id) {
861 chunk_mailchimp_id_by_user.insert(uuid, contact_id.to_string());
862 }
863 chunk_contact_count += 1;
864 }
865 }
866 }
867 }
868 }
869 if let Some(errors) = response_data["errors"].as_array()
870 && !errors.is_empty()
871 {
872 let sample: Vec<String> = errors
873 .iter()
874 .take(5)
875 .map(|e| {
876 let email = e
877 .get("email_address")
878 .and_then(|v| v.as_str())
879 .unwrap_or("?");
880 let msg = e.get("error").and_then(|v| v.as_str()).unwrap_or_else(|| {
881 e.get("message").and_then(|v| v.as_str()).unwrap_or("?")
882 });
883 format!("{}: {}", email, msg)
884 })
885 .collect();
886 warn!(
887 "Mailchimp batch subscribe chunk {}/{} returned {} error(s) (e.g. unsubscribed/rejected). Sample: {:?}",
888 chunk_index + 1,
889 total_chunks,
890 errors.len(),
891 sample
892 );
893 }
894 info!(
895 "Chunk {}/{}: {} contact_id(s) from new_members/updated_members",
896 chunk_index + 1,
897 total_chunks,
898 chunk_contact_count
899 );
900
901 let chunk_users: Vec<UserMarketingConsentWithDetails> =
902 chunk.iter().map(|user| user.details.clone()).collect();
903 match sync_policy_tags_for_users(
904 token,
905 &chunk_users,
906 &chunk_mailchimp_id_by_user,
907 Duration::from_secs(BATCH_POLL_TIMEOUT_SECS),
908 Duration::from_secs(BATCH_POLL_INTERVAL_SECS),
909 )
910 .await
911 {
912 Ok(results) => {
913 let (ok, failed): (Vec<_>, Vec<_>) = results.into_iter().partition(|r| r.success);
914 if !failed.is_empty() {
915 let sample: Vec<String> = failed
916 .iter()
917 .take(5)
918 .map(|r| {
919 format!(
920 "{}: {}",
921 r.user_id,
922 r.error.as_deref().unwrap_or("unknown error")
923 )
924 })
925 .collect();
926 warn!(
927 "Policy tag sync for list '{}' chunk {}/{} had {} failure(s) out of {}. Sample: {:?}",
928 token.mailchimp_mailing_list_id,
929 chunk_index + 1,
930 total_chunks,
931 failed.len(),
932 ok.len() + failed.len(),
933 sample
934 );
935 }
936 }
937 Err(e) => {
938 error!(
939 "Failed to sync policy tags for list '{}' chunk {}/{}: {:?}",
940 token.mailchimp_mailing_list_id,
941 chunk_index + 1,
942 total_chunks,
943 e
944 );
945 }
946 }
947 }
948
949 let got_contact_id_set: std::collections::HashSet<Uuid> = user_id_contact_id_pairs
950 .iter()
951 .filter_map(|(uid, _)| Uuid::parse_str(uid).ok())
952 .collect();
953 let no_contact_id_user_ids: Vec<Uuid> = sent_user_ids
954 .iter()
955 .filter(|id| !got_contact_id_set.contains(id))
956 .copied()
957 .collect();
958
959 if !no_contact_id_user_ids.is_empty() {
960 let sample_len = no_contact_id_user_ids.len().min(10);
961 warn!(
962 "Mailchimp did not return contact_id for {} member(s) (likely unsubscribed, removed, or rejected). Marking synced_to_mailchimp_at to stop retry. First {} user_id(s): {:?}",
963 no_contact_id_user_ids.len(),
964 sample_len,
965 &no_contact_id_user_ids[..sample_len]
966 );
967 if let Err(e) = headless_lms_models::marketing_consents::update_synced_to_mailchimp_at_to_all_synced_users(
968 conn,
969 &no_contact_id_user_ids,
970 )
971 .await
972 {
973 error!(
974 "Failed to update synced_to_mailchimp_at for no-contact_id users: {:?}",
975 e
976 );
977 }
978 }
979
980 info!(
981 "Batch subscribe list '{}': sent {} member(s), got {} contact_id(s){}",
982 token.mailchimp_mailing_list_id,
983 sent_user_ids.len(),
984 user_id_contact_id_pairs.len(),
985 if no_contact_id_user_ids.is_empty() {
986 String::new()
987 } else {
988 format!(
989 ", {} without contact_id (marked synced to stop retry)",
990 no_contact_id_user_ids.len()
991 )
992 }
993 );
994
995 if !user_id_contact_id_pairs.is_empty() {
996 headless_lms_models::marketing_consents::update_user_mailchimp_id_at_to_all_synced_users(
997 conn,
998 user_id_contact_id_pairs,
999 )
1000 .await?;
1001 }
1002
1003 Ok(successfully_synced_user_ids)
1004}
1005
1006async fn sync_completed_tag_for_members(
1008 users_details: &[UserMarketingConsentWithDetails],
1009 successfully_synced_user_ids: &[Uuid],
1010 mailchimp_id_by_user: &std::collections::HashMap<Uuid, String>,
1011 slug: &str,
1012 token: &MarketingMailingListAccessToken,
1013) -> anyhow::Result<()> {
1014 let tag_name = format!("{}-completed", slug);
1015 let success_set: std::collections::HashSet<_> = successfully_synced_user_ids.iter().collect();
1016
1017 let mut operations = Vec::new();
1018 for user in users_details {
1019 if !success_set.contains(&user.user_id) {
1020 continue;
1021 }
1022 let Some(user_mailchimp_id) = mailchimp_id_by_user.get(&user.user_id) else {
1023 continue;
1024 };
1025 let status = if user.completed_course_at.is_some() {
1026 "active"
1027 } else {
1028 "inactive"
1029 };
1030 operations.push(MailchimpOperation {
1031 method: Method::POST,
1032 path: format!(
1033 "/lists/{}/members/{}/tags",
1034 token.mailchimp_mailing_list_id, user_mailchimp_id
1035 ),
1036 body: Some(json!({
1037 "tags": [
1038 { "name": tag_name, "status": status }
1039 ]
1040 })),
1041 operation_id: Some(user.user_id.to_string()),
1042 });
1043 }
1044
1045 if operations.is_empty() {
1046 return Ok(());
1047 }
1048
1049 let ops_count = operations.len();
1050 info!(
1051 "Stage: completion tags (mark course completion). Preparing {} operation(s) for list '{}'",
1052 ops_count, token.mailchimp_mailing_list_id
1053 );
1054
1055 let timeout = Duration::from_secs(BATCH_POLL_TIMEOUT_SECS);
1056 let poll_interval = Duration::from_secs(BATCH_POLL_INTERVAL_SECS);
1057 let executor = MailchimpExecutor::new(timeout, poll_interval);
1058
1059 let start_time = Instant::now();
1060 let results = executor.execute(token, operations).await?;
1061 let failures: Vec<_> = results.iter().filter(|r| !r.is_success()).collect();
1062 if !failures.is_empty() {
1063 let sample: Vec<String> = failures
1064 .iter()
1065 .take(5)
1066 .map(|r| {
1067 format!(
1068 "{}: {}",
1069 r.operation_id.as_deref().unwrap_or("?"),
1070 r.error.as_deref().unwrap_or("unknown error")
1071 )
1072 })
1073 .collect();
1074 warn!(
1075 "Completion tag sync for list '{}' had {} failure(s) out of {}. Sample: {:?}",
1076 token.mailchimp_mailing_list_id,
1077 failures.len(),
1078 results.len(),
1079 sample
1080 );
1081 return Err(anyhow::anyhow!("completion tag sync failed"));
1082 }
1083
1084 info!(
1085 "Completed sync of {} completion tags for list '{}' in {:.2}s",
1086 ops_count,
1087 token.mailchimp_mailing_list_id,
1088 start_time.elapsed().as_secs_f64()
1089 );
1090
1091 Ok(())
1092}
1093
1094async fn update_emails_in_mailchimp(
1096 users: Vec<UserEmailSubscription>,
1097 list_id: &str,
1098 server_prefix: &str,
1099 access_token: &str,
1100) -> anyhow::Result<Vec<EmailSyncResult>> {
1101 let mut successfully_synced_users = Vec::new();
1102 let mut failed_user_ids = Vec::new();
1103
1104 for user in users {
1105 if let Some(ref user_mailchimp_id) = user.user_mailchimp_id {
1106 if let Some(ref status) = user.email_subscription_in_mailchimp
1107 && status != "subscribed"
1108 {
1109 continue; }
1111
1112 let url = format!(
1113 "https://{}.api.mailchimp.com/3.0/lists/{}/members/{}",
1114 server_prefix, list_id, user_mailchimp_id
1115 );
1116
1117 let body = serde_json::json!({
1119 "email_address": &user.email,
1120 "status": &user.email_subscription_in_mailchimp,
1121 });
1122
1123 let update_response = REQWEST_CLIENT
1125 .put(&url)
1126 .header("Authorization", format!("apikey {}", access_token))
1127 .json(&body)
1128 .send()
1129 .await?;
1130
1131 if update_response.status().is_success() {
1132 successfully_synced_users.push(EmailSyncResult {
1133 user_id: user.user_id,
1134 user_mailchimp_id: user_mailchimp_id.clone(),
1135 });
1136 } else {
1137 failed_user_ids.push(user.user_id);
1138 }
1139 } else {
1140 continue;
1141 }
1142 }
1143
1144 if !failed_user_ids.is_empty() {
1145 info!("Failed to update the following users:");
1146 for user_id in &failed_user_ids {
1147 error!("User ID: {}", user_id);
1148 }
1149 }
1150
1151 Ok(successfully_synced_users)
1152}
1153
1154async fn fetch_unsubscribed_users_from_mailchimp_in_chunks(
1156 list_id: &str,
1157 server_prefix: &str,
1158 access_token: &str,
1159 chunk_size: usize,
1160) -> anyhow::Result<Vec<(String, String, String, String)>> {
1161 let mut all_data = Vec::new();
1162 let mut offset = 0;
1163
1164 loop {
1165 let url = format!(
1166 "https://{}.api.mailchimp.com/3.0/lists/{}/members?offset={}&count={}&fields=members.merge_fields,members.status,members.last_changed&status=unsubscribed,non-subscribed",
1167 server_prefix, list_id, offset, chunk_size
1168 );
1169
1170 let response = REQWEST_CLIENT
1171 .get(&url)
1172 .header("Authorization", format!("apikey {}", access_token))
1173 .send()
1174 .await?
1175 .json::<serde_json::Value>()
1176 .await?;
1177
1178 let empty_vec = vec![];
1179 let members = response["members"].as_array().unwrap_or(&empty_vec);
1180 if members.is_empty() {
1181 break;
1182 }
1183
1184 for member in members {
1185 if let (Some(status), Some(last_changed), Some(merge_fields)) = (
1187 member["status"].as_str(),
1188 member["last_changed"].as_str(),
1189 member["merge_fields"].as_object(),
1190 ) {
1191 if let (Some(user_id), Some(language_group_id)) = (
1193 merge_fields.get("USERID").and_then(|v| v.as_str()),
1194 merge_fields.get("LANGGRPID").and_then(|v| v.as_str()),
1195 ) {
1196 if !user_id.is_empty() && !language_group_id.is_empty() {
1198 all_data.push((
1199 user_id.to_string(),
1200 last_changed.to_string(),
1201 language_group_id.to_string(),
1202 status.to_string(),
1203 ));
1204 }
1205 }
1206 }
1207 }
1208
1209 let total_items = response["total_items"].as_u64().unwrap_or(0) as usize;
1211 if offset + chunk_size >= total_items {
1212 break;
1213 }
1214
1215 offset += chunk_size;
1216 }
1217
1218 Ok(all_data)
1219}
1220
1221const BATCH_SIZE: usize = 1000;
1222
1223async fn process_unsubscribed_users_from_mailchimp(
1224 conn: &mut PgConnection,
1225 mailchimp_data: Vec<(String, String, String, String)>,
1226) -> anyhow::Result<()> {
1227 let total_records = mailchimp_data.len();
1228 let total_chunks = total_records.div_ceil(BATCH_SIZE);
1229
1230 for (chunk_num, chunk) in mailchimp_data.chunks(BATCH_SIZE).enumerate() {
1231 if chunk.is_empty() {
1232 continue;
1233 }
1234
1235 if let Err(e) = headless_lms_models::marketing_consents::update_unsubscribed_users_from_mailchimp_in_bulk(
1236 conn,
1237 chunk.to_vec(),
1238 )
1239 .await
1240 {
1241 error!(
1242 "Error while processing chunk {}/{}: {}",
1243 chunk_num + 1,
1244 total_chunks,
1245 e
1246 );
1247 }
1248 }
1249
1250 Ok(())
1251}