headless_lms_server/programs/mailchimp_syncer/
mod.rs

1use crate::prelude::*;
2use crate::setup_tracing;
3use dotenv::dotenv;
4use headless_lms_models::marketing_consents::MarketingMailingListAccessToken;
5use headless_lms_models::marketing_consents::UserEmailSubscription;
6use headless_lms_models::marketing_consents::UserMarketingConsentWithDetails;
7use headless_lms_utils::http::REQWEST_CLIENT;
8use serde_json::json;
9use sqlx::{PgConnection, PgPool};
10use std::{
11    env,
12    time::{Duration, Instant},
13};
14use uuid::Uuid;
15
16mod batch_client;
17mod mailchimp_ops;
18mod policy_tags;
19
20use batch_client::MAX_MAILCHIMP_BATCH_SIZE;
21use mailchimp_ops::{MailchimpExecutor, MailchimpOperation};
22use policy_tags::sync_policy_tags_for_users;
23use reqwest::Method;
24
25#[derive(Debug, Deserialize)]
26struct MailchimpField {
27    field_id: String,
28    field_name: String,
29}
30
31#[derive(Debug)]
32struct FieldSchema {
33    tag: &'static str,
34    name: &'static str,
35    default_value: &'static str,
36}
37
38const REQUIRED_FIELDS: &[FieldSchema] = &[
39    FieldSchema {
40        tag: "FNAME",
41        name: "First Name",
42        default_value: "",
43    },
44    FieldSchema {
45        tag: "LNAME",
46        name: "Last Name",
47        default_value: "",
48    },
49    FieldSchema {
50        tag: "MARKETING",
51        name: "Accepts Marketing",
52        default_value: "disallowed",
53    },
54    FieldSchema {
55        tag: "LOCALE",
56        name: "Locale",
57        default_value: "en",
58    },
59    FieldSchema {
60        tag: "GRADUATED",
61        name: "Graduated",
62        default_value: "",
63    },
64    FieldSchema {
65        tag: "COURSEID",
66        name: "Course ID",
67        default_value: "",
68    },
69    FieldSchema {
70        tag: "LANGGRPID",
71        name: "Course language Group ID",
72        default_value: "",
73    },
74    FieldSchema {
75        tag: "USERID",
76        name: "User ID",
77        default_value: "",
78    },
79    FieldSchema {
80        tag: "RESEARCH",
81        name: "Research consent",
82        default_value: "false",
83    },
84];
85
86/// These fields are excluded from removing all fields that are not in the schema
87const FIELDS_EXCLUDED_FROM_REMOVING: &[&str] = &["PHONE", "PACE", "COUNTRY", "MMERGE9"];
88const REMOVE_UNSUPPORTED_FIELDS: bool = false;
89const PROCESS_UNSUBSCRIBES_INTERVAL_SECS: u64 = 10_800;
90
91const SYNC_INTERVAL_SECS: u64 = 10;
92const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
93
94const BATCH_POLL_INTERVAL_SECS: u64 = 10;
95const BATCH_POLL_TIMEOUT_SECS: u64 = 300;
96
97#[derive(Debug)]
98struct SyncUser {
99    details: UserMarketingConsentWithDetails,
100    payload: serde_json::Value,
101}
102
103#[derive(Debug)]
104struct EmailSyncResult {
105    user_id: Uuid,
106    user_mailchimp_id: String,
107}
108
109/// The main function that initializes environment variables, config, and sync process.
110pub async fn main() -> anyhow::Result<()> {
111    initialize_environment()?;
112
113    let config = initialize_configuration().await?;
114
115    let db_pool = initialize_database_pool(&config.database_url).await?;
116    let mut conn = db_pool.acquire().await?;
117
118    let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
119    let mut ticks = 0;
120
121    let access_tokens =
122        headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
123            &mut conn,
124        )
125        .await?;
126
127    // Iterate through access tokens and ensure Mailchimp schema is set up
128    for token in &access_tokens {
129        if let Err(e) = ensure_mailchimp_schema(
130            &token.mailchimp_mailing_list_id,
131            &token.server_prefix,
132            &token.access_token,
133        )
134        .await
135        {
136            error!(
137                "Failed to set up Mailchimp schema for list '{}': {:?}",
138                token.mailchimp_mailing_list_id, e
139            );
140            return Err(e);
141        }
142    }
143
144    info!("Starting mailchimp syncer (periodic reconciliation loop).");
145
146    let mut last_time_unsubscribes_processed = Instant::now();
147    let mut last_time_tags_synced = Instant::now();
148
149    loop {
150        interval.tick().await;
151        ticks += 1;
152
153        if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
154            ticks = 0;
155            info!("Still syncing.");
156        }
157        let mut process_unsubscribes = false;
158        if last_time_unsubscribes_processed.elapsed().as_secs()
159            >= PROCESS_UNSUBSCRIBES_INTERVAL_SECS
160        {
161            process_unsubscribes = true;
162            last_time_unsubscribes_processed = Instant::now();
163        };
164
165        // Check and sync tags for this access token once every hour
166        if last_time_tags_synced.elapsed().as_secs() >= 3600 {
167            info!("Stage: tag catalog sync (keep local tag metadata aligned with Mailchimp).");
168            for token in &access_tokens {
169                if let Err(e) = sync_tags_from_mailchimp(
170                    &mut conn,
171                    &token.mailchimp_mailing_list_id,
172                    &token.access_token,
173                    &token.server_prefix,
174                    token.id,
175                    token.course_language_group_id,
176                )
177                .await
178                {
179                    error!(
180                        "Failed to sync tags for list '{}': {:?}",
181                        token.mailchimp_mailing_list_id, e
182                    );
183                }
184            }
185            last_time_tags_synced = Instant::now();
186        }
187
188        if let Err(e) = sync_contacts(&mut conn, &config, process_unsubscribes).await {
189            error!("Error during synchronization: {:?}", e);
190            if let Ok(sqlx::Error::Io(..)) = e.downcast::<sqlx::Error>() {
191                // this usually happens if the database is reset while running bin/dev etc.
192                info!("syncer may have lost its connection to the db, trying to reconnect");
193                conn = db_pool.acquire().await?;
194            }
195        }
196    }
197}
198
199/// Initializes environment variables, logging, and tracing setup.
200fn initialize_environment() -> anyhow::Result<()> {
201    // TODO: Audit that the environment access only happens in single-threaded code.
202    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
203    dotenv().ok();
204    setup_tracing()?;
205    Ok(())
206}
207
208/// Structure to hold the configuration settings, such as the database URL.
209struct SyncerConfig {
210    database_url: String,
211}
212
213/// Initializes and returns configuration settings (database URL).
214async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
215    let database_url = env::var("DATABASE_URL")
216        .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
217
218    Ok(SyncerConfig { database_url })
219}
220
221/// Initializes the PostgreSQL connection pool from the provided database URL.
222async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
223    PgPool::connect(database_url).await.map_err(|e| {
224        anyhow::anyhow!(
225            "Failed to connect to the database at {}: {:?}",
226            database_url,
227            e
228        )
229    })
230}
231
232/// Ensures the Mailchimp schema is up to date, adding required fields and removing any extra ones.
233async fn ensure_mailchimp_schema(
234    list_id: &str,
235    server_prefix: &str,
236    access_token: &str,
237) -> anyhow::Result<()> {
238    let existing_fields =
239        fetch_current_mailchimp_fields(list_id, server_prefix, access_token).await?;
240
241    if REMOVE_UNSUPPORTED_FIELDS {
242        // Remove extra fields not in REQUIRED_FIELDS or FIELDS_EXCLUDED_FROM_REMOVING
243        for field in existing_fields.iter() {
244            if !REQUIRED_FIELDS
245                .iter()
246                .any(|r| r.tag == field.field_name.as_str())
247                && !FIELDS_EXCLUDED_FROM_REMOVING.contains(&field.field_name.as_str())
248            {
249                match remove_field_from_mailchimp(
250                    list_id,
251                    &field.field_id,
252                    server_prefix,
253                    access_token,
254                )
255                .await
256                {
257                    Err(e) => {
258                        warn!("Could not remove field '{}': {}", field.field_name, e);
259                    }
260                    _ => {
261                        info!("Removed field '{}'", field.field_name);
262                    }
263                }
264            }
265        }
266    }
267
268    // Add any required fields that are missing
269    for required_field in REQUIRED_FIELDS.iter() {
270        if !existing_fields
271            .iter()
272            .any(|f| f.field_name == required_field.tag)
273        {
274            match add_field_to_mailchimp(list_id, required_field, server_prefix, access_token).await
275            {
276                Err(e) => {
277                    warn!(
278                        "Failed to add required field '{}': {}",
279                        required_field.name, e
280                    );
281                }
282                _ => {
283                    info!(
284                        "Successfully added required field '{}'",
285                        required_field.name
286                    );
287                }
288            }
289        } else {
290            info!(
291                "Field '{}' already exists, skipping addition.",
292                required_field.name
293            );
294        }
295    }
296
297    Ok(())
298}
299
300/// Fetches the current merge fields from the Mailchimp list schema.
301async fn fetch_current_mailchimp_fields(
302    list_id: &str,
303    server_prefix: &str,
304    access_token: &str,
305) -> Result<Vec<MailchimpField>, anyhow::Error> {
306    let url = format!(
307        "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
308        server_prefix, list_id
309    );
310
311    let response = REQWEST_CLIENT
312        .get(&url)
313        .header("Authorization", format!("apikey {}", access_token))
314        .send()
315        .await?;
316
317    if response.status().is_success() {
318        let json = response.json::<serde_json::Value>().await?;
319
320        let fields: Vec<MailchimpField> = json["merge_fields"]
321            .as_array()
322            .unwrap_or(&vec![])
323            .iter()
324            .filter_map(|field| {
325                let field_id = field["merge_id"].as_u64();
326                let field_name = field["tag"].as_str();
327
328                if let (Some(field_id), Some(field_name)) = (field_id, field_name) {
329                    Some(MailchimpField {
330                        field_id: field_id.to_string(),
331                        field_name: field_name.to_string(),
332                    })
333                } else {
334                    None
335                }
336            })
337            .collect();
338
339        Ok(fields)
340    } else {
341        let error_text = response.text().await?;
342        error!("Error fetching merge fields: {}", error_text);
343        Err(anyhow::anyhow!("Failed to fetch current Mailchimp fields."))
344    }
345}
346
347/// Adds a new merge field to the Mailchimp list.
348async fn add_field_to_mailchimp(
349    list_id: &str,
350    field_schema: &FieldSchema,
351    server_prefix: &str,
352    access_token: &str,
353) -> anyhow::Result<()> {
354    let url = format!(
355        "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
356        server_prefix, list_id
357    );
358
359    let body = json!({
360        "tag": field_schema.tag,
361        "name": field_schema.name,
362        "type": "text",
363        "default_value": field_schema.default_value,
364    });
365
366    let response = REQWEST_CLIENT
367        .post(&url)
368        .header("Authorization", format!("apikey {}", access_token))
369        .json(&body)
370        .send()
371        .await?;
372
373    if response.status().is_success() {
374        Ok(())
375    } else {
376        let status = response.status();
377        let error_text = response
378            .text()
379            .await
380            .unwrap_or_else(|_| "No additional error info.".to_string());
381        Err(anyhow::anyhow!(
382            "Failed to add field to Mailchimp. Status: {}. Error: {}",
383            status,
384            error_text
385        ))
386    }
387}
388
389/// Removes a merge field from the Mailchimp list by with a field ID.
390async fn remove_field_from_mailchimp(
391    list_id: &str,
392    field_id: &str,
393    server_prefix: &str,
394    access_token: &str,
395) -> anyhow::Result<()> {
396    let url = format!(
397        "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields/{}",
398        server_prefix, list_id, field_id
399    );
400
401    let response = REQWEST_CLIENT
402        .delete(&url)
403        .header("Authorization", format!("apikey {}", access_token))
404        .send()
405        .await?;
406
407    if response.status().is_success() {
408        Ok(())
409    } else {
410        let status = response.status();
411        let error_text = response
412            .text()
413            .await
414            .unwrap_or_else(|_| "No additional error info.".to_string());
415        Err(anyhow::anyhow!(
416            "Failed to remove field from Mailchimp. Status: {}. Error: {}",
417            status,
418            error_text
419        ))
420    }
421}
422
423/// Fetch tags from mailchimp and sync the changes to the database
424pub async fn sync_tags_from_mailchimp(
425    conn: &mut PgConnection,
426    list_id: &str,
427    access_token: &str,
428    server_prefix: &str,
429    marketing_mailing_list_access_token_id: Uuid,
430    course_language_group_id: Uuid,
431) -> anyhow::Result<()> {
432    let url = format!(
433        "https://{}.api.mailchimp.com/3.0/lists/{}/tag-search",
434        server_prefix, list_id
435    );
436
437    let response = REQWEST_CLIENT
438        .get(&url)
439        .header("Authorization", format!("apikey {}", access_token))
440        .send()
441        .await?;
442
443    // Extract tags from the response
444    let response_json = response.json::<serde_json::Value>().await?;
445
446    let mailchimp_tags = match response_json.get("tags") {
447        Some(tags) if tags.is_array() => tags
448            .as_array()
449            .ok_or_else(|| anyhow::anyhow!("tags field is not an array despite is_array() check"))?
450            .iter()
451            .filter_map(|tag| {
452                let name = tag.get("name")?.as_str()?.to_string();
453
454                let id = match tag.get("id") {
455                    Some(serde_json::Value::Number(num)) => num.to_string(),
456                    Some(serde_json::Value::String(str_id)) => str_id.clone(),
457                    _ => return None,
458                };
459
460                Some((id, name))
461            })
462            .collect::<Vec<(String, String)>>(),
463        _ => {
464            warn!("No tags found for list '{}', skipping sync.", list_id);
465            return Ok(());
466        }
467    };
468
469    // Fetch the current tags from the database
470    let db_tags = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
471            conn,
472            course_language_group_id,
473            marketing_mailing_list_access_token_id,
474        )
475        .await?;
476
477    // Check if any tags from Mailchimp are renamed
478    for (tag_id, tag_name) in &mailchimp_tags {
479        if let Some(db_tag) = db_tags.iter().find(|db_tag| {
480            db_tag.get("id").and_then(|v| v.as_str()).map(|v| v.trim()) == Some(tag_id.trim())
481        }) {
482            let db_tag_name = db_tag
483                .get("tag_name")
484                .and_then(|v| v.as_str())
485                .unwrap_or_default();
486            if db_tag_name != tag_name {
487                headless_lms_models::marketing_consents::upsert_tag(
488                    conn,
489                    course_language_group_id,
490                    marketing_mailing_list_access_token_id,
491                    tag_id.clone(),
492                    tag_name.clone(),
493                )
494                .await?;
495            }
496        }
497    }
498
499    // Check if any tags in the database have been removed via Mailchimp
500    for db_tag in db_tags.iter() {
501        let db_tag_id = db_tag
502            .get("id")
503            .and_then(|v| v.as_str())
504            .map(|v| v.trim().to_string())
505            .unwrap_or_default();
506
507        // Check if this tag exists in the Mailchimp tags
508        if !mailchimp_tags
509            .iter()
510            .any(|(tag_id, _)| tag_id.trim() == db_tag_id.trim())
511        {
512            if db_tag_id.is_empty() {
513                warn!("Skipping tag deletion due to missing ID: {:?}", db_tag);
514                continue;
515            }
516            headless_lms_models::marketing_consents::delete_tag(
517                conn,
518                db_tag_id.clone(),
519                course_language_group_id,
520            )
521            .await?;
522        }
523    }
524
525    Ok(())
526}
527
528/// Synchronizes the user contacts with Mailchimp.
529/// Added a boolean flag to determine whether to process unsubscribes.
530async fn sync_contacts(
531    conn: &mut PgConnection,
532    _config: &SyncerConfig,
533    process_unsubscribes: bool,
534) -> anyhow::Result<()> {
535    let access_tokens =
536        headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
537            conn,
538        )
539        .await?;
540
541    let mut successfully_synced_user_ids = Vec::new();
542
543    // Iterate through tokens and fetch and send user details to Mailchimp
544    for token in access_tokens {
545        let course_language_group_slug =
546            match headless_lms_models::course_language_groups::get_slug_by_id(
547                conn,
548                token.course_language_group_id,
549            )
550            .await
551            {
552                Ok(Some(s)) => Some(s),
553                Ok(None) => None,
554                Err(e) => {
555                    error!(
556                        course_language_group_id = %token.course_language_group_id,
557                        "Failed to get course language group slug: {:?}",
558                        e
559                    );
560                    return Err(e.into());
561                }
562            };
563
564        // Fetch all users from Mailchimp and sync possible changes locally
565        if process_unsubscribes {
566            info!(
567                "Stage: unsubscribe sync (apply Mailchimp compliance/unsubscribe changes locally)."
568            );
569            let mailchimp_data = fetch_unsubscribed_users_from_mailchimp_in_chunks(
570                &token.mailchimp_mailing_list_id,
571                &token.server_prefix,
572                &token.access_token,
573                1000,
574            )
575            .await?;
576
577            info!(
578                "Processing Mailchimp data for list: {}",
579                token.mailchimp_mailing_list_id
580            );
581
582            process_unsubscribed_users_from_mailchimp(conn, mailchimp_data).await?;
583        }
584
585        // Fetch unsynced emails and update them in Mailchimp
586        let users_with_unsynced_emails =
587            headless_lms_models::marketing_consents::fetch_all_unsynced_updated_emails(
588                conn,
589                token.course_language_group_id,
590            )
591            .await?;
592
593        info!(
594            "Stage: email updates (ensure member identifiers stay correct). Found {} unsynced user email(s) for course language group: {}",
595            users_with_unsynced_emails.len(),
596            token.course_language_group_id
597        );
598
599        if !users_with_unsynced_emails.is_empty() {
600            let email_sync_results = update_emails_in_mailchimp(
601                users_with_unsynced_emails,
602                &token.mailchimp_mailing_list_id,
603                &token.server_prefix,
604                &token.access_token,
605            )
606            .await?;
607
608            let email_synced_user_ids: Vec<Uuid> =
609                email_sync_results.iter().map(|r| r.user_id).collect();
610            successfully_synced_user_ids.extend(email_synced_user_ids.clone());
611
612            if !email_sync_results.is_empty() {
613                let mailchimp_id_by_user: std::collections::HashMap<Uuid, String> =
614                    email_sync_results
615                        .iter()
616                        .map(|r| (r.user_id, r.user_mailchimp_id.clone()))
617                        .collect();
618                let user_details =
619                    headless_lms_models::marketing_consents::fetch_user_marketing_consents_with_details_by_user_ids(
620                        conn,
621                        token.course_language_group_id,
622                        &email_synced_user_ids,
623                    )
624                    .await?;
625                match sync_policy_tags_for_users(
626                    &token,
627                    &user_details,
628                    &mailchimp_id_by_user,
629                    Duration::from_secs(BATCH_POLL_TIMEOUT_SECS),
630                    Duration::from_secs(BATCH_POLL_INTERVAL_SECS),
631                )
632                .await
633                {
634                    Ok(results) => {
635                        let (ok, failed): (Vec<_>, Vec<_>) =
636                            results.into_iter().partition(|r| r.success);
637                        if !failed.is_empty() {
638                            let sample: Vec<String> = failed
639                                .iter()
640                                .take(5)
641                                .map(|r| {
642                                    format!(
643                                        "{}: {}",
644                                        r.user_id,
645                                        r.error.as_deref().unwrap_or("unknown error")
646                                    )
647                                })
648                                .collect();
649                            warn!(
650                                "Policy tag sync after email updates for list '{}' had {} failure(s) out of {}. Sample: {:?}",
651                                token.mailchimp_mailing_list_id,
652                                failed.len(),
653                                ok.len() + failed.len(),
654                                sample
655                            );
656                        }
657                    }
658                    Err(e) => {
659                        error!(
660                            "Failed to sync policy tags after email updates for list '{}': {:?}",
661                            token.mailchimp_mailing_list_id, e
662                        );
663                    }
664                }
665            }
666        }
667
668        let tag_objects = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(conn, token.course_language_group_id, token.id).await?;
669
670        // Fetch unsynced user consents and update them in Mailchimp
671        let unsynced_users_details =
672            headless_lms_models::marketing_consents::fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
673                conn,
674                token.course_language_group_id,
675            )
676            .await?;
677
678        info!(
679            "Stage: member upsert (merge fields + consent). Found {} unsynced user consent(s) for course language group: {}",
680            unsynced_users_details.len(),
681            token.course_language_group_id
682        );
683
684        if !unsynced_users_details.is_empty() {
685            let consent_synced_user_ids =
686                send_users_to_mailchimp(conn, &token, &unsynced_users_details, tag_objects).await?;
687
688            if let Some(ref slug) = course_language_group_slug
689                && !consent_synced_user_ids.is_empty()
690            {
691                let mailchimp_id_mapping =
692                    headless_lms_models::marketing_consents::fetch_user_mailchimp_id_mapping(
693                        conn,
694                        token.course_language_group_id,
695                        &consent_synced_user_ids,
696                    )
697                    .await?;
698                if let Err(e) = sync_completed_tag_for_members(
699                    &unsynced_users_details,
700                    &consent_synced_user_ids,
701                    &mailchimp_id_mapping,
702                    slug,
703                    &token,
704                )
705                .await
706                {
707                    error!(
708                        "Failed to sync completed tag for list '{}': {:?}",
709                        token.mailchimp_mailing_list_id, e
710                    );
711                }
712            }
713
714            // Store the successfully synced user IDs from syncing user consents
715            successfully_synced_user_ids.extend(consent_synced_user_ids);
716        }
717    }
718
719    // If there are any successfully synced users, update the database to mark them as synced
720    if !successfully_synced_user_ids.is_empty() {
721        match headless_lms_models::marketing_consents::update_synced_to_mailchimp_at_to_all_synced_users(
722        conn,
723        &successfully_synced_user_ids,
724    )
725    .await
726    {
727        Ok(_) => {
728            info!(
729                "Stage: mark synced (avoid repeat work). Successfully updated synced status for {} users.",
730                successfully_synced_user_ids.len()
731            );
732        }
733        Err(e) => {
734            error!(
735                "Failed to update synced status for {} users: {:?}",
736                successfully_synced_user_ids.len(),
737                e
738            );
739        }
740    }
741    }
742
743    Ok(())
744}
745
746/// Sends a batch of users to Mailchimp for synchronization.
747pub async fn send_users_to_mailchimp(
748    conn: &mut PgConnection,
749    token: &MarketingMailingListAccessToken,
750    users_details: &[UserMarketingConsentWithDetails],
751    tag_objects: Vec<serde_json::Value>,
752) -> anyhow::Result<Vec<Uuid>> {
753    let mut users_to_sync = vec![];
754    let mut sent_user_ids = Vec::new();
755    let mut successfully_synced_user_ids = Vec::new();
756    let mut user_id_contact_id_pairs = Vec::new();
757
758    // Prepare each user's data for Mailchimp
759    for user in users_details {
760        // Check user has given permission to send data to mailchimp
761        if let Some(ref subscription) = user.email_subscription_in_mailchimp
762            && subscription == "subscribed"
763        {
764            sent_user_ids.push(user.user_id);
765            let user_details = json!({
766                "email_address": user.email,
767                "status": user.email_subscription_in_mailchimp,
768                "merge_fields": {
769                    "FNAME": user.first_name.clone().unwrap_or("".to_string()),
770                    "LNAME": user.last_name.clone().unwrap_or("".to_string()),
771                    "MARKETING": if user.consent { "allowed" } else { "disallowed" },
772                    "LOCALE": user.locale,
773                    "GRADUATED": user.completed_course_at.map(|cca| cca.to_rfc3339()).unwrap_or("".to_string()),
774                    "USERID": user.user_id,
775                    "COURSEID": user.course_id,
776                    "LANGGRPID": user.course_language_group_id,
777                    "RESEARCH" : if user.research_consent.unwrap_or(false) { "allowed" } else { "disallowed" },
778                    "COUNTRY" : user.country.clone().unwrap_or("".to_string()),
779                },
780               "tags": tag_objects.iter().map(|tag| tag["name"].clone()).collect::<Vec<_>>()
781            });
782            users_to_sync.push(SyncUser {
783                details: user.clone(),
784                payload: user_details,
785            });
786        }
787    }
788
789    if users_to_sync.is_empty() {
790        info!("No new users to sync.");
791        return Ok(vec![]);
792    }
793
794    let url = format!(
795        "https://{}.api.mailchimp.com/3.0/lists/{}",
796        token.server_prefix, token.mailchimp_mailing_list_id
797    );
798
799    let total_chunks = users_to_sync.len().div_ceil(MAX_MAILCHIMP_BATCH_SIZE);
800    info!(
801        "Syncing {} members to list '{}' in {} chunk(s)",
802        users_to_sync.len(),
803        token.mailchimp_mailing_list_id,
804        total_chunks
805    );
806
807    for (chunk_index, chunk) in users_to_sync.chunks(MAX_MAILCHIMP_BATCH_SIZE).enumerate() {
808        info!(
809            "Syncing users chunk {}/{} ({} members)",
810            chunk_index + 1,
811            total_chunks,
812            chunk.len()
813        );
814        let chunk_members: Vec<serde_json::Value> =
815            chunk.iter().map(|user| user.payload.clone()).collect();
816        let batch_request = json!({
817            "members": chunk_members,
818            "update_existing": true
819        });
820
821        let response = REQWEST_CLIENT
822            .post(&url)
823            .header("Content-Type", "application/json")
824            .header("Authorization", format!("apikey {}", token.access_token))
825            .json(&batch_request)
826            .send()
827            .await?;
828
829        if !response.status().is_success() {
830            let status = response.status();
831            let error_text = response.text().await?;
832            return Err(anyhow::anyhow!(
833                "Error syncing users to Mailchimp. Status: {}. Error: {}",
834                status,
835                error_text
836            ));
837        }
838
839        let response_data: serde_json::Value = response.json().await?;
840        let mut chunk_contact_count = 0;
841        let mut chunk_mailchimp_id_by_user: std::collections::HashMap<Uuid, String> =
842            std::collections::HashMap::new();
843        for user in chunk {
844            if let Some(ref mailchimp_id) = user.details.user_mailchimp_id {
845                chunk_mailchimp_id_by_user
846                    .entry(user.details.user_id)
847                    .or_insert_with(|| mailchimp_id.clone());
848            }
849        }
850        for key in &["new_members", "updated_members"] {
851            if let Some(members) = response_data[key].as_array() {
852                for member in members {
853                    if let Some(user_id) = member["merge_fields"]["USERID"].as_str() {
854                        if let Ok(uuid) = uuid::Uuid::parse_str(user_id) {
855                            successfully_synced_user_ids.push(uuid);
856                        }
857                        if let Some(contact_id) = member["contact_id"].as_str() {
858                            user_id_contact_id_pairs
859                                .push((user_id.to_string(), contact_id.to_string()));
860                            if let Ok(uuid) = uuid::Uuid::parse_str(user_id) {
861                                chunk_mailchimp_id_by_user.insert(uuid, contact_id.to_string());
862                            }
863                            chunk_contact_count += 1;
864                        }
865                    }
866                }
867            }
868        }
869        if let Some(errors) = response_data["errors"].as_array()
870            && !errors.is_empty()
871        {
872            let sample: Vec<String> = errors
873                .iter()
874                .take(5)
875                .map(|e| {
876                    let email = e
877                        .get("email_address")
878                        .and_then(|v| v.as_str())
879                        .unwrap_or("?");
880                    let msg = e.get("error").and_then(|v| v.as_str()).unwrap_or_else(|| {
881                        e.get("message").and_then(|v| v.as_str()).unwrap_or("?")
882                    });
883                    format!("{}: {}", email, msg)
884                })
885                .collect();
886            warn!(
887                "Mailchimp batch subscribe chunk {}/{} returned {} error(s) (e.g. unsubscribed/rejected). Sample: {:?}",
888                chunk_index + 1,
889                total_chunks,
890                errors.len(),
891                sample
892            );
893        }
894        info!(
895            "Chunk {}/{}: {} contact_id(s) from new_members/updated_members",
896            chunk_index + 1,
897            total_chunks,
898            chunk_contact_count
899        );
900
901        let chunk_users: Vec<UserMarketingConsentWithDetails> =
902            chunk.iter().map(|user| user.details.clone()).collect();
903        match sync_policy_tags_for_users(
904            token,
905            &chunk_users,
906            &chunk_mailchimp_id_by_user,
907            Duration::from_secs(BATCH_POLL_TIMEOUT_SECS),
908            Duration::from_secs(BATCH_POLL_INTERVAL_SECS),
909        )
910        .await
911        {
912            Ok(results) => {
913                let (ok, failed): (Vec<_>, Vec<_>) = results.into_iter().partition(|r| r.success);
914                if !failed.is_empty() {
915                    let sample: Vec<String> = failed
916                        .iter()
917                        .take(5)
918                        .map(|r| {
919                            format!(
920                                "{}: {}",
921                                r.user_id,
922                                r.error.as_deref().unwrap_or("unknown error")
923                            )
924                        })
925                        .collect();
926                    warn!(
927                        "Policy tag sync for list '{}' chunk {}/{} had {} failure(s) out of {}. Sample: {:?}",
928                        token.mailchimp_mailing_list_id,
929                        chunk_index + 1,
930                        total_chunks,
931                        failed.len(),
932                        ok.len() + failed.len(),
933                        sample
934                    );
935                }
936            }
937            Err(e) => {
938                error!(
939                    "Failed to sync policy tags for list '{}' chunk {}/{}: {:?}",
940                    token.mailchimp_mailing_list_id,
941                    chunk_index + 1,
942                    total_chunks,
943                    e
944                );
945            }
946        }
947    }
948
949    let got_contact_id_set: std::collections::HashSet<Uuid> = user_id_contact_id_pairs
950        .iter()
951        .filter_map(|(uid, _)| Uuid::parse_str(uid).ok())
952        .collect();
953    let no_contact_id_user_ids: Vec<Uuid> = sent_user_ids
954        .iter()
955        .filter(|id| !got_contact_id_set.contains(id))
956        .copied()
957        .collect();
958
959    if !no_contact_id_user_ids.is_empty() {
960        let sample_len = no_contact_id_user_ids.len().min(10);
961        warn!(
962            "Mailchimp did not return contact_id for {} member(s) (likely unsubscribed, removed, or rejected). Marking synced_to_mailchimp_at to stop retry. First {} user_id(s): {:?}",
963            no_contact_id_user_ids.len(),
964            sample_len,
965            &no_contact_id_user_ids[..sample_len]
966        );
967        if let Err(e) = headless_lms_models::marketing_consents::update_synced_to_mailchimp_at_to_all_synced_users(
968            conn,
969            &no_contact_id_user_ids,
970        )
971        .await
972        {
973            error!(
974                "Failed to update synced_to_mailchimp_at for no-contact_id users: {:?}",
975                e
976            );
977        }
978    }
979
980    info!(
981        "Batch subscribe list '{}': sent {} member(s), got {} contact_id(s){}",
982        token.mailchimp_mailing_list_id,
983        sent_user_ids.len(),
984        user_id_contact_id_pairs.len(),
985        if no_contact_id_user_ids.is_empty() {
986            String::new()
987        } else {
988            format!(
989                ", {} without contact_id (marked synced to stop retry)",
990                no_contact_id_user_ids.len()
991            )
992        }
993    );
994
995    if !user_id_contact_id_pairs.is_empty() {
996        headless_lms_models::marketing_consents::update_user_mailchimp_id_at_to_all_synced_users(
997            conn,
998            user_id_contact_id_pairs,
999        )
1000        .await?;
1001    }
1002
1003    Ok(successfully_synced_user_ids)
1004}
1005
1006/// Sets or removes the "{slug}-completed" tag on Mailchimp members based on course completion. Skips users without user_mailchimp_id.
1007async fn sync_completed_tag_for_members(
1008    users_details: &[UserMarketingConsentWithDetails],
1009    successfully_synced_user_ids: &[Uuid],
1010    mailchimp_id_by_user: &std::collections::HashMap<Uuid, String>,
1011    slug: &str,
1012    token: &MarketingMailingListAccessToken,
1013) -> anyhow::Result<()> {
1014    let tag_name = format!("{}-completed", slug);
1015    let success_set: std::collections::HashSet<_> = successfully_synced_user_ids.iter().collect();
1016
1017    let mut operations = Vec::new();
1018    for user in users_details {
1019        if !success_set.contains(&user.user_id) {
1020            continue;
1021        }
1022        let Some(user_mailchimp_id) = mailchimp_id_by_user.get(&user.user_id) else {
1023            continue;
1024        };
1025        let status = if user.completed_course_at.is_some() {
1026            "active"
1027        } else {
1028            "inactive"
1029        };
1030        operations.push(MailchimpOperation {
1031            method: Method::POST,
1032            path: format!(
1033                "/lists/{}/members/{}/tags",
1034                token.mailchimp_mailing_list_id, user_mailchimp_id
1035            ),
1036            body: Some(json!({
1037                "tags": [
1038                    { "name": tag_name, "status": status }
1039                ]
1040            })),
1041            operation_id: Some(user.user_id.to_string()),
1042        });
1043    }
1044
1045    if operations.is_empty() {
1046        return Ok(());
1047    }
1048
1049    let ops_count = operations.len();
1050    info!(
1051        "Stage: completion tags (mark course completion). Preparing {} operation(s) for list '{}'",
1052        ops_count, token.mailchimp_mailing_list_id
1053    );
1054
1055    let timeout = Duration::from_secs(BATCH_POLL_TIMEOUT_SECS);
1056    let poll_interval = Duration::from_secs(BATCH_POLL_INTERVAL_SECS);
1057    let executor = MailchimpExecutor::new(timeout, poll_interval);
1058
1059    let start_time = Instant::now();
1060    let results = executor.execute(token, operations).await?;
1061    let failures: Vec<_> = results.iter().filter(|r| !r.is_success()).collect();
1062    if !failures.is_empty() {
1063        let sample: Vec<String> = failures
1064            .iter()
1065            .take(5)
1066            .map(|r| {
1067                format!(
1068                    "{}: {}",
1069                    r.operation_id.as_deref().unwrap_or("?"),
1070                    r.error.as_deref().unwrap_or("unknown error")
1071                )
1072            })
1073            .collect();
1074        warn!(
1075            "Completion tag sync for list '{}' had {} failure(s) out of {}. Sample: {:?}",
1076            token.mailchimp_mailing_list_id,
1077            failures.len(),
1078            results.len(),
1079            sample
1080        );
1081        return Err(anyhow::anyhow!("completion tag sync failed"));
1082    }
1083
1084    info!(
1085        "Completed sync of {} completion tags for list '{}' in {:.2}s",
1086        ops_count,
1087        token.mailchimp_mailing_list_id,
1088        start_time.elapsed().as_secs_f64()
1089    );
1090
1091    Ok(())
1092}
1093
1094/// Updates the email addresses of multiple users in a Mailchimp mailing list.
1095async fn update_emails_in_mailchimp(
1096    users: Vec<UserEmailSubscription>,
1097    list_id: &str,
1098    server_prefix: &str,
1099    access_token: &str,
1100) -> anyhow::Result<Vec<EmailSyncResult>> {
1101    let mut successfully_synced_users = Vec::new();
1102    let mut failed_user_ids = Vec::new();
1103
1104    for user in users {
1105        if let Some(ref user_mailchimp_id) = user.user_mailchimp_id {
1106            if let Some(ref status) = user.email_subscription_in_mailchimp
1107                && status != "subscribed"
1108            {
1109                continue; // Skip this user if they are not subscribed because Mailchimp only updates emails that are subscribed
1110            }
1111
1112            let url = format!(
1113                "https://{}.api.mailchimp.com/3.0/lists/{}/members/{}",
1114                server_prefix, list_id, user_mailchimp_id
1115            );
1116
1117            // Prepare the body for the PUT request
1118            let body = serde_json::json!({
1119                "email_address": &user.email,
1120                "status": &user.email_subscription_in_mailchimp,
1121            });
1122
1123            // Update the email
1124            let update_response = REQWEST_CLIENT
1125                .put(&url)
1126                .header("Authorization", format!("apikey {}", access_token))
1127                .json(&body)
1128                .send()
1129                .await?;
1130
1131            if update_response.status().is_success() {
1132                successfully_synced_users.push(EmailSyncResult {
1133                    user_id: user.user_id,
1134                    user_mailchimp_id: user_mailchimp_id.clone(),
1135                });
1136            } else {
1137                failed_user_ids.push(user.user_id);
1138            }
1139        } else {
1140            continue;
1141        }
1142    }
1143
1144    if !failed_user_ids.is_empty() {
1145        info!("Failed to update the following users:");
1146        for user_id in &failed_user_ids {
1147            error!("User ID: {}", user_id);
1148        }
1149    }
1150
1151    Ok(successfully_synced_users)
1152}
1153
1154/// Fetches data from Mailchimp in chunks.
1155async fn fetch_unsubscribed_users_from_mailchimp_in_chunks(
1156    list_id: &str,
1157    server_prefix: &str,
1158    access_token: &str,
1159    chunk_size: usize,
1160) -> anyhow::Result<Vec<(String, String, String, String)>> {
1161    let mut all_data = Vec::new();
1162    let mut offset = 0;
1163
1164    loop {
1165        let url = format!(
1166            "https://{}.api.mailchimp.com/3.0/lists/{}/members?offset={}&count={}&fields=members.merge_fields,members.status,members.last_changed&status=unsubscribed,non-subscribed",
1167            server_prefix, list_id, offset, chunk_size
1168        );
1169
1170        let response = REQWEST_CLIENT
1171            .get(&url)
1172            .header("Authorization", format!("apikey {}", access_token))
1173            .send()
1174            .await?
1175            .json::<serde_json::Value>()
1176            .await?;
1177
1178        let empty_vec = vec![];
1179        let members = response["members"].as_array().unwrap_or(&empty_vec);
1180        if members.is_empty() {
1181            break;
1182        }
1183
1184        for member in members {
1185            // Process the member, but only if necessary fields are present and valid
1186            if let (Some(status), Some(last_changed), Some(merge_fields)) = (
1187                member["status"].as_str(),
1188                member["last_changed"].as_str(),
1189                member["merge_fields"].as_object(),
1190            ) {
1191                // Ensure both USERID and LANGGRPID are present and valid
1192                if let (Some(user_id), Some(language_group_id)) = (
1193                    merge_fields.get("USERID").and_then(|v| v.as_str()),
1194                    merge_fields.get("LANGGRPID").and_then(|v| v.as_str()),
1195                ) {
1196                    // Avoid adding data if any field is missing or empty
1197                    if !user_id.is_empty() && !language_group_id.is_empty() {
1198                        all_data.push((
1199                            user_id.to_string(),
1200                            last_changed.to_string(),
1201                            language_group_id.to_string(),
1202                            status.to_string(),
1203                        ));
1204                    }
1205                }
1206            }
1207        }
1208
1209        // Check the pagination info from the response
1210        let total_items = response["total_items"].as_u64().unwrap_or(0) as usize;
1211        if offset + chunk_size >= total_items {
1212            break;
1213        }
1214
1215        offset += chunk_size;
1216    }
1217
1218    Ok(all_data)
1219}
1220
1221const BATCH_SIZE: usize = 1000;
1222
1223async fn process_unsubscribed_users_from_mailchimp(
1224    conn: &mut PgConnection,
1225    mailchimp_data: Vec<(String, String, String, String)>,
1226) -> anyhow::Result<()> {
1227    let total_records = mailchimp_data.len();
1228    let total_chunks = total_records.div_ceil(BATCH_SIZE);
1229
1230    for (chunk_num, chunk) in mailchimp_data.chunks(BATCH_SIZE).enumerate() {
1231        if chunk.is_empty() {
1232            continue;
1233        }
1234
1235        if let Err(e) = headless_lms_models::marketing_consents::update_unsubscribed_users_from_mailchimp_in_bulk(
1236            conn,
1237            chunk.to_vec(),
1238        )
1239        .await
1240        {
1241            error!(
1242                "Error while processing chunk {}/{}: {}",
1243                chunk_num + 1,
1244                total_chunks,
1245                e
1246            );
1247        }
1248    }
1249
1250    Ok(())
1251}