Skip to main content

headless_lms_server/programs/mailchimp_syncer/
mod.rs

1use crate::config::program_config::ProgramConfig;
2use crate::prelude::*;
3use crate::setup_tracing;
4use dotenvy::dotenv;
5use headless_lms_models::marketing_consents::MarketingMailingListAccessToken;
6use headless_lms_models::marketing_consents::UserEmailSubscription;
7use headless_lms_models::marketing_consents::UserMarketingConsentWithDetails;
8use headless_lms_utils::http::REQWEST_CLIENT;
9use secrecy::ExposeSecret;
10use serde_json::json;
11use sqlx::{PgConnection, PgPool};
12use std::{
13    env,
14    time::{Duration, Instant},
15};
16use uuid::Uuid;
17
18mod batch_client;
19mod mailchimp_ops;
20mod policy_tags;
21
22use batch_client::MAX_MAILCHIMP_BATCH_SIZE;
23use mailchimp_ops::{MailchimpExecutor, MailchimpOperation};
24use policy_tags::sync_policy_tags_for_users;
25use reqwest::Method;
26
27#[derive(Debug, Deserialize)]
28struct MailchimpField {
29    field_id: String,
30    field_name: String,
31}
32
33#[derive(Debug)]
34struct FieldSchema {
35    tag: &'static str,
36    name: &'static str,
37    default_value: &'static str,
38}
39
40const REQUIRED_FIELDS: &[FieldSchema] = &[
41    FieldSchema {
42        tag: "FNAME",
43        name: "First Name",
44        default_value: "",
45    },
46    FieldSchema {
47        tag: "LNAME",
48        name: "Last Name",
49        default_value: "",
50    },
51    FieldSchema {
52        tag: "MARKETING",
53        name: "Accepts Marketing",
54        default_value: "disallowed",
55    },
56    FieldSchema {
57        tag: "LOCALE",
58        name: "Locale",
59        default_value: "en",
60    },
61    FieldSchema {
62        tag: "GRADUATED",
63        name: "Graduated",
64        default_value: "",
65    },
66    FieldSchema {
67        tag: "COURSEID",
68        name: "Course ID",
69        default_value: "",
70    },
71    FieldSchema {
72        tag: "LANGGRPID",
73        name: "Course language Group ID",
74        default_value: "",
75    },
76    FieldSchema {
77        tag: "USERID",
78        name: "User ID",
79        default_value: "",
80    },
81    FieldSchema {
82        tag: "RESEARCH",
83        name: "Research consent",
84        default_value: "false",
85    },
86];
87
88/// These fields are excluded from removing all fields that are not in the schema
89const FIELDS_EXCLUDED_FROM_REMOVING: &[&str] = &["PHONE", "PACE", "COUNTRY", "MMERGE9"];
90const REMOVE_UNSUPPORTED_FIELDS: bool = false;
91const PROCESS_UNSUBSCRIBES_INTERVAL_SECS: u64 = 10_800;
92
93const SYNC_INTERVAL_SECS: u64 = 10;
94const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
95
96const BATCH_POLL_INTERVAL_SECS: u64 = 10;
97const BATCH_POLL_TIMEOUT_SECS: u64 = 300;
98
99#[derive(Debug)]
100struct SyncUser {
101    details: UserMarketingConsentWithDetails,
102    payload: serde_json::Value,
103}
104
105#[derive(Debug)]
106struct EmailSyncResult {
107    user_id: Uuid,
108    user_mailchimp_id: String,
109}
110
111/// The main function that initializes environment variables, config, and sync process.
112pub async fn main() -> anyhow::Result<()> {
113    initialize_environment()?;
114
115    let config = initialize_configuration().await?;
116
117    let db_pool = initialize_database_pool(&config.database_url).await?;
118    let mut conn = db_pool.acquire().await?;
119
120    let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
121    let mut ticks = 0;
122
123    let access_tokens =
124        headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
125            &mut conn,
126        )
127        .await?;
128
129    // Iterate through access tokens and ensure Mailchimp schema is set up
130    for token in &access_tokens {
131        if let Err(e) = ensure_mailchimp_schema(
132            &token.mailchimp_mailing_list_id,
133            &token.server_prefix,
134            &token.access_token,
135        )
136        .await
137        {
138            error!(
139                "Failed to set up Mailchimp schema for list '{}': {:?}",
140                token.mailchimp_mailing_list_id, e
141            );
142            return Err(e);
143        }
144    }
145
146    info!("Starting mailchimp syncer (periodic reconciliation loop).");
147
148    let mut last_time_unsubscribes_processed = Instant::now();
149    let mut last_time_tags_synced = Instant::now();
150
151    loop {
152        interval.tick().await;
153        ticks += 1;
154
155        if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
156            ticks = 0;
157            info!("Still syncing.");
158        }
159        let mut process_unsubscribes = false;
160        if last_time_unsubscribes_processed.elapsed().as_secs()
161            >= PROCESS_UNSUBSCRIBES_INTERVAL_SECS
162        {
163            process_unsubscribes = true;
164            last_time_unsubscribes_processed = Instant::now();
165        };
166
167        // Check and sync tags for this access token once every hour
168        if last_time_tags_synced.elapsed().as_secs() >= 3600 {
169            info!("Stage: tag catalog sync (keep local tag metadata aligned with Mailchimp).");
170            for token in &access_tokens {
171                if let Err(e) = sync_tags_from_mailchimp(
172                    &mut conn,
173                    &token.mailchimp_mailing_list_id,
174                    &token.access_token,
175                    &token.server_prefix,
176                    token.id,
177                    token.course_language_group_id,
178                )
179                .await
180                {
181                    error!(
182                        "Failed to sync tags for list '{}': {:?}",
183                        token.mailchimp_mailing_list_id, e
184                    );
185                }
186            }
187            last_time_tags_synced = Instant::now();
188        }
189
190        if let Err(e) = sync_contacts(&mut conn, &config, process_unsubscribes).await {
191            error!("Error during synchronization: {:?}", e);
192            if let Ok(sqlx::Error::Io(..)) = e.downcast::<sqlx::Error>() {
193                // this usually happens if the database is reset while running bin/dev etc.
194                info!("syncer may have lost its connection to the db, trying to reconnect");
195                conn = db_pool.acquire().await?;
196            }
197        }
198    }
199}
200
201/// Initializes environment variables, logging, and tracing setup.
202fn initialize_environment() -> anyhow::Result<()> {
203    // TODO: Audit that the environment access only happens in single-threaded code.
204    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
205    dotenv().ok();
206    setup_tracing()?;
207    Ok(())
208}
209
210/// Structure to hold the configuration settings, such as the database URL.
211struct SyncerConfig {
212    database_url: String,
213}
214
215/// Initializes and returns configuration settings (database URL).
216async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
217    let database_url = ProgramConfig::database_url_with_default();
218
219    Ok(SyncerConfig { database_url })
220}
221
222/// Initializes the PostgreSQL connection pool from the provided database URL.
223async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
224    PgPool::connect(database_url).await.map_err(|e| {
225        anyhow::anyhow!(
226            "Failed to connect to the database at {}: {:?}",
227            database_url,
228            e
229        )
230    })
231}
232
233/// Ensures the Mailchimp schema is up to date, adding required fields and removing any extra ones.
234async fn ensure_mailchimp_schema(
235    list_id: &str,
236    server_prefix: &str,
237    access_token: &DbSecret,
238) -> anyhow::Result<()> {
239    let existing_fields =
240        fetch_current_mailchimp_fields(list_id, server_prefix, access_token).await?;
241
242    if REMOVE_UNSUPPORTED_FIELDS {
243        // Remove extra fields not in REQUIRED_FIELDS or FIELDS_EXCLUDED_FROM_REMOVING
244        for field in existing_fields.iter() {
245            if !REQUIRED_FIELDS
246                .iter()
247                .any(|r| r.tag == field.field_name.as_str())
248                && !FIELDS_EXCLUDED_FROM_REMOVING.contains(&field.field_name.as_str())
249            {
250                match remove_field_from_mailchimp(
251                    list_id,
252                    &field.field_id,
253                    server_prefix,
254                    access_token,
255                )
256                .await
257                {
258                    Err(e) => {
259                        warn!("Could not remove field '{}': {}", field.field_name, e);
260                    }
261                    _ => {
262                        info!("Removed field '{}'", field.field_name);
263                    }
264                }
265            }
266        }
267    }
268
269    // Add any required fields that are missing
270    for required_field in REQUIRED_FIELDS.iter() {
271        if !existing_fields
272            .iter()
273            .any(|f| f.field_name == required_field.tag)
274        {
275            match add_field_to_mailchimp(list_id, required_field, server_prefix, access_token).await
276            {
277                Err(e) => {
278                    warn!(
279                        "Failed to add required field '{}': {}",
280                        required_field.name, e
281                    );
282                }
283                _ => {
284                    info!(
285                        "Successfully added required field '{}'",
286                        required_field.name
287                    );
288                }
289            }
290        } else {
291            info!(
292                "Field '{}' already exists, skipping addition.",
293                required_field.name
294            );
295        }
296    }
297
298    Ok(())
299}
300
301/// Fetches the current merge fields from the Mailchimp list schema.
302async fn fetch_current_mailchimp_fields(
303    list_id: &str,
304    server_prefix: &str,
305    access_token: &DbSecret,
306) -> Result<Vec<MailchimpField>, anyhow::Error> {
307    let url = format!(
308        "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
309        server_prefix, list_id
310    );
311
312    let response = REQWEST_CLIENT
313        .get(&url)
314        .header(
315            "Authorization",
316            format!("apikey {}", access_token.expose_secret()),
317        )
318        .send()
319        .await?;
320
321    if response.status().is_success() {
322        let json = response.json::<serde_json::Value>().await?;
323
324        let fields: Vec<MailchimpField> = json["merge_fields"]
325            .as_array()
326            .unwrap_or(&vec![])
327            .iter()
328            .filter_map(|field| {
329                let field_id = field["merge_id"].as_u64();
330                let field_name = field["tag"].as_str();
331
332                if let (Some(field_id), Some(field_name)) = (field_id, field_name) {
333                    Some(MailchimpField {
334                        field_id: field_id.to_string(),
335                        field_name: field_name.to_string(),
336                    })
337                } else {
338                    None
339                }
340            })
341            .collect();
342
343        Ok(fields)
344    } else {
345        let error_text = response.text().await?;
346        error!("Error fetching merge fields: {}", error_text);
347        Err(anyhow::anyhow!("Failed to fetch current Mailchimp fields."))
348    }
349}
350
351/// Adds a new merge field to the Mailchimp list.
352async fn add_field_to_mailchimp(
353    list_id: &str,
354    field_schema: &FieldSchema,
355    server_prefix: &str,
356    access_token: &DbSecret,
357) -> anyhow::Result<()> {
358    let url = format!(
359        "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
360        server_prefix, list_id
361    );
362
363    let body = json!({
364        "tag": field_schema.tag,
365        "name": field_schema.name,
366        "type": "text",
367        "default_value": field_schema.default_value,
368    });
369
370    let response = REQWEST_CLIENT
371        .post(&url)
372        .header(
373            "Authorization",
374            format!("apikey {}", access_token.expose_secret()),
375        )
376        .json(&body)
377        .send()
378        .await?;
379
380    if response.status().is_success() {
381        Ok(())
382    } else {
383        let status = response.status();
384        let error_text = response
385            .text()
386            .await
387            .unwrap_or_else(|_| "No additional error info.".to_string());
388        Err(anyhow::anyhow!(
389            "Failed to add field to Mailchimp. Status: {}. Error: {}",
390            status,
391            error_text
392        ))
393    }
394}
395
396/// Removes a merge field from the Mailchimp list by with a field ID.
397async fn remove_field_from_mailchimp(
398    list_id: &str,
399    field_id: &str,
400    server_prefix: &str,
401    access_token: &DbSecret,
402) -> anyhow::Result<()> {
403    let url = format!(
404        "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields/{}",
405        server_prefix, list_id, field_id
406    );
407
408    let response = REQWEST_CLIENT
409        .delete(&url)
410        .header(
411            "Authorization",
412            format!("apikey {}", access_token.expose_secret()),
413        )
414        .send()
415        .await?;
416
417    if response.status().is_success() {
418        Ok(())
419    } else {
420        let status = response.status();
421        let error_text = response
422            .text()
423            .await
424            .unwrap_or_else(|_| "No additional error info.".to_string());
425        Err(anyhow::anyhow!(
426            "Failed to remove field from Mailchimp. Status: {}. Error: {}",
427            status,
428            error_text
429        ))
430    }
431}
432
433/// Fetch tags from mailchimp and sync the changes to the database
434pub async fn sync_tags_from_mailchimp(
435    conn: &mut PgConnection,
436    list_id: &str,
437    access_token: &DbSecret,
438    server_prefix: &str,
439    marketing_mailing_list_access_token_id: Uuid,
440    course_language_group_id: Uuid,
441) -> anyhow::Result<()> {
442    let url = format!(
443        "https://{}.api.mailchimp.com/3.0/lists/{}/tag-search",
444        server_prefix, list_id
445    );
446
447    let response = REQWEST_CLIENT
448        .get(&url)
449        .header(
450            "Authorization",
451            format!("apikey {}", access_token.expose_secret()),
452        )
453        .send()
454        .await?;
455
456    // Extract tags from the response
457    let response_json = response.json::<serde_json::Value>().await?;
458
459    let mailchimp_tags = match response_json.get("tags") {
460        Some(tags) if tags.is_array() => tags
461            .as_array()
462            .ok_or_else(|| anyhow::anyhow!("tags field is not an array despite is_array() check"))?
463            .iter()
464            .filter_map(|tag| {
465                let name = tag.get("name")?.as_str()?.to_string();
466
467                let id = match tag.get("id") {
468                    Some(serde_json::Value::Number(num)) => num.to_string(),
469                    Some(serde_json::Value::String(str_id)) => str_id.clone(),
470                    _ => return None,
471                };
472
473                Some((id, name))
474            })
475            .collect::<Vec<(String, String)>>(),
476        _ => {
477            warn!("No tags found for list '{}', skipping sync.", list_id);
478            return Ok(());
479        }
480    };
481
482    // Fetch the current tags from the database
483    let db_tags = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
484            conn,
485            course_language_group_id,
486            marketing_mailing_list_access_token_id,
487        )
488        .await?;
489
490    // Check if any tags from Mailchimp are renamed
491    for (tag_id, tag_name) in &mailchimp_tags {
492        if let Some(db_tag) = db_tags.iter().find(|db_tag| {
493            db_tag.get("id").and_then(|v| v.as_str()).map(|v| v.trim()) == Some(tag_id.trim())
494        }) {
495            let db_tag_name = db_tag
496                .get("tag_name")
497                .and_then(|v| v.as_str())
498                .unwrap_or_default();
499            if db_tag_name != tag_name {
500                headless_lms_models::marketing_consents::upsert_tag(
501                    conn,
502                    course_language_group_id,
503                    marketing_mailing_list_access_token_id,
504                    tag_id.clone(),
505                    tag_name.clone(),
506                )
507                .await?;
508            }
509        }
510    }
511
512    // Check if any tags in the database have been removed via Mailchimp
513    for db_tag in db_tags.iter() {
514        let db_tag_id = db_tag
515            .get("id")
516            .and_then(|v| v.as_str())
517            .map(|v| v.trim().to_string())
518            .unwrap_or_default();
519
520        // Check if this tag exists in the Mailchimp tags
521        if !mailchimp_tags
522            .iter()
523            .any(|(tag_id, _)| tag_id.trim() == db_tag_id.trim())
524        {
525            if db_tag_id.is_empty() {
526                warn!("Skipping tag deletion due to missing ID: {:?}", db_tag);
527                continue;
528            }
529            headless_lms_models::marketing_consents::delete_tag(
530                conn,
531                db_tag_id.clone(),
532                course_language_group_id,
533            )
534            .await?;
535        }
536    }
537
538    Ok(())
539}
540
541/// Synchronizes the user contacts with Mailchimp.
542/// Added a boolean flag to determine whether to process unsubscribes.
543async fn sync_contacts(
544    conn: &mut PgConnection,
545    _config: &SyncerConfig,
546    process_unsubscribes: bool,
547) -> anyhow::Result<()> {
548    let access_tokens =
549        headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
550            conn,
551        )
552        .await?;
553
554    let mut successfully_synced_user_ids = Vec::new();
555
556    // Iterate through tokens and fetch and send user details to Mailchimp
557    for token in access_tokens {
558        let course_language_group_slug =
559            match headless_lms_models::course_language_groups::get_slug_by_id(
560                conn,
561                token.course_language_group_id,
562            )
563            .await
564            {
565                Ok(Some(s)) => Some(s),
566                Ok(None) => None,
567                Err(e) => {
568                    error!(
569                        course_language_group_id = %token.course_language_group_id,
570                        "Failed to get course language group slug: {:?}",
571                        e
572                    );
573                    return Err(e.into());
574                }
575            };
576
577        // Fetch all users from Mailchimp and sync possible changes locally
578        if process_unsubscribes {
579            info!(
580                "Stage: unsubscribe sync (apply Mailchimp compliance/unsubscribe changes locally)."
581            );
582            let mailchimp_data = fetch_unsubscribed_users_from_mailchimp_in_chunks(
583                &token.mailchimp_mailing_list_id,
584                &token.server_prefix,
585                &token.access_token,
586                1000,
587            )
588            .await?;
589
590            info!(
591                "Processing Mailchimp data for list: {}",
592                token.mailchimp_mailing_list_id
593            );
594
595            process_unsubscribed_users_from_mailchimp(conn, mailchimp_data).await?;
596        }
597
598        // Fetch unsynced emails and update them in Mailchimp
599        let users_with_unsynced_emails =
600            headless_lms_models::marketing_consents::fetch_all_unsynced_updated_emails(
601                conn,
602                token.course_language_group_id,
603            )
604            .await?;
605
606        info!(
607            "Stage: email updates (ensure member identifiers stay correct). Found {} unsynced user email(s) for course language group: {}",
608            users_with_unsynced_emails.len(),
609            token.course_language_group_id
610        );
611
612        if !users_with_unsynced_emails.is_empty() {
613            let email_sync_results = update_emails_in_mailchimp(
614                users_with_unsynced_emails,
615                &token.mailchimp_mailing_list_id,
616                &token.server_prefix,
617                &token.access_token,
618            )
619            .await?;
620
621            let email_synced_user_ids: Vec<Uuid> =
622                email_sync_results.iter().map(|r| r.user_id).collect();
623            successfully_synced_user_ids.extend(email_synced_user_ids.clone());
624
625            if !email_sync_results.is_empty() {
626                let mailchimp_id_by_user: std::collections::HashMap<Uuid, String> =
627                    email_sync_results
628                        .iter()
629                        .map(|r| (r.user_id, r.user_mailchimp_id.clone()))
630                        .collect();
631                let user_details =
632                    headless_lms_models::marketing_consents::fetch_user_marketing_consents_with_details_by_user_ids(
633                        conn,
634                        token.course_language_group_id,
635                        &email_synced_user_ids,
636                    )
637                    .await?;
638                match sync_policy_tags_for_users(
639                    &token,
640                    &user_details,
641                    &mailchimp_id_by_user,
642                    Duration::from_secs(BATCH_POLL_TIMEOUT_SECS),
643                    Duration::from_secs(BATCH_POLL_INTERVAL_SECS),
644                )
645                .await
646                {
647                    Ok(results) => {
648                        let (ok, failed): (Vec<_>, Vec<_>) =
649                            results.into_iter().partition(|r| r.success);
650                        if !failed.is_empty() {
651                            let sample: Vec<String> = failed
652                                .iter()
653                                .take(5)
654                                .map(|r| {
655                                    format!(
656                                        "{}: {}",
657                                        r.user_id,
658                                        r.error.as_deref().unwrap_or("unknown error")
659                                    )
660                                })
661                                .collect();
662                            warn!(
663                                "Policy tag sync after email updates for list '{}' had {} failure(s) out of {}. Sample: {:?}",
664                                token.mailchimp_mailing_list_id,
665                                failed.len(),
666                                ok.len() + failed.len(),
667                                sample
668                            );
669                        }
670                    }
671                    Err(e) => {
672                        error!(
673                            "Failed to sync policy tags after email updates for list '{}': {:?}",
674                            token.mailchimp_mailing_list_id, e
675                        );
676                    }
677                }
678            }
679        }
680
681        let tag_objects = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(conn, token.course_language_group_id, token.id).await?;
682
683        // Fetch unsynced user consents and update them in Mailchimp
684        let unsynced_users_details =
685            headless_lms_models::marketing_consents::fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
686                conn,
687                token.course_language_group_id,
688            )
689            .await?;
690
691        info!(
692            "Stage: member upsert (merge fields + consent). Found {} unsynced user consent(s) for course language group: {}",
693            unsynced_users_details.len(),
694            token.course_language_group_id
695        );
696
697        if !unsynced_users_details.is_empty() {
698            let consent_synced_user_ids =
699                send_users_to_mailchimp(conn, &token, &unsynced_users_details, tag_objects).await?;
700
701            if let Some(ref slug) = course_language_group_slug
702                && !consent_synced_user_ids.is_empty()
703            {
704                let mailchimp_id_mapping =
705                    headless_lms_models::marketing_consents::fetch_user_mailchimp_id_mapping(
706                        conn,
707                        token.course_language_group_id,
708                        &consent_synced_user_ids,
709                    )
710                    .await?;
711                if let Err(e) = sync_completed_tag_for_members(
712                    &unsynced_users_details,
713                    &consent_synced_user_ids,
714                    &mailchimp_id_mapping,
715                    slug,
716                    &token,
717                )
718                .await
719                {
720                    error!(
721                        "Failed to sync completed tag for list '{}': {:?}",
722                        token.mailchimp_mailing_list_id, e
723                    );
724                }
725            }
726
727            // Store the successfully synced user IDs from syncing user consents
728            successfully_synced_user_ids.extend(consent_synced_user_ids);
729        }
730    }
731
732    // If there are any successfully synced users, update the database to mark them as synced
733    if !successfully_synced_user_ids.is_empty() {
734        match headless_lms_models::marketing_consents::update_synced_to_mailchimp_at_to_all_synced_users(
735        conn,
736        &successfully_synced_user_ids,
737    )
738    .await
739    {
740        Ok(_) => {
741            info!(
742                "Stage: mark synced (avoid repeat work). Successfully updated synced status for {} users.",
743                successfully_synced_user_ids.len()
744            );
745        }
746        Err(e) => {
747            error!(
748                "Failed to update synced status for {} users: {:?}",
749                successfully_synced_user_ids.len(),
750                e
751            );
752        }
753    }
754    }
755
756    Ok(())
757}
758
759/// Sends a batch of users to Mailchimp for synchronization.
760pub async fn send_users_to_mailchimp(
761    conn: &mut PgConnection,
762    token: &MarketingMailingListAccessToken,
763    users_details: &[UserMarketingConsentWithDetails],
764    tag_objects: Vec<serde_json::Value>,
765) -> anyhow::Result<Vec<Uuid>> {
766    let mut users_to_sync = vec![];
767    let mut sent_user_ids = Vec::new();
768    let mut successfully_synced_user_ids = Vec::new();
769    let mut user_id_contact_id_pairs = Vec::new();
770
771    // Prepare each user's data for Mailchimp
772    for user in users_details {
773        // Check user has given permission to send data to mailchimp
774        if let Some(ref subscription) = user.email_subscription_in_mailchimp
775            && subscription == "subscribed"
776        {
777            sent_user_ids.push(user.user_id);
778            let user_details = json!({
779                "email_address": user.email,
780                "status": user.email_subscription_in_mailchimp,
781                "merge_fields": {
782                    "FNAME": user.first_name.clone().unwrap_or("".to_string()),
783                    "LNAME": user.last_name.clone().unwrap_or("".to_string()),
784                    "MARKETING": if user.consent { "allowed" } else { "disallowed" },
785                    "LOCALE": user.locale,
786                    "GRADUATED": user.completed_course_at.map(|cca| cca.to_rfc3339()).unwrap_or("".to_string()),
787                    "USERID": user.user_id,
788                    "COURSEID": user.course_id,
789                    "LANGGRPID": user.course_language_group_id,
790                    "RESEARCH" : if user.research_consent.unwrap_or(false) { "allowed" } else { "disallowed" },
791                    "COUNTRY" : user.country.clone().unwrap_or("".to_string()),
792                },
793               "tags": tag_objects.iter().map(|tag| tag["name"].clone()).collect::<Vec<_>>()
794            });
795            users_to_sync.push(SyncUser {
796                details: user.clone(),
797                payload: user_details,
798            });
799        }
800    }
801
802    if users_to_sync.is_empty() {
803        info!("No new users to sync.");
804        return Ok(vec![]);
805    }
806
807    let url = format!(
808        "https://{}.api.mailchimp.com/3.0/lists/{}",
809        token.server_prefix, token.mailchimp_mailing_list_id
810    );
811
812    let total_chunks = users_to_sync.len().div_ceil(MAX_MAILCHIMP_BATCH_SIZE);
813    info!(
814        "Syncing {} members to list '{}' in {} chunk(s)",
815        users_to_sync.len(),
816        token.mailchimp_mailing_list_id,
817        total_chunks
818    );
819
820    for (chunk_index, chunk) in users_to_sync.chunks(MAX_MAILCHIMP_BATCH_SIZE).enumerate() {
821        info!(
822            "Syncing users chunk {}/{} ({} members)",
823            chunk_index + 1,
824            total_chunks,
825            chunk.len()
826        );
827        let chunk_members: Vec<serde_json::Value> =
828            chunk.iter().map(|user| user.payload.clone()).collect();
829        let batch_request = json!({
830            "members": chunk_members,
831            "update_existing": true
832        });
833
834        let response = REQWEST_CLIENT
835            .post(&url)
836            .header("Content-Type", "application/json")
837            .header(
838                "Authorization",
839                format!("apikey {}", token.access_token.expose_secret()),
840            )
841            .json(&batch_request)
842            .send()
843            .await?;
844
845        if !response.status().is_success() {
846            let status = response.status();
847            let error_text = response.text().await?;
848            return Err(anyhow::anyhow!(
849                "Error syncing users to Mailchimp. Status: {}. Error: {}",
850                status,
851                error_text
852            ));
853        }
854
855        let response_data: serde_json::Value = response.json().await?;
856        let mut chunk_contact_count = 0;
857        let mut chunk_mailchimp_id_by_user: std::collections::HashMap<Uuid, String> =
858            std::collections::HashMap::new();
859        for user in chunk {
860            if let Some(ref mailchimp_id) = user.details.user_mailchimp_id {
861                chunk_mailchimp_id_by_user
862                    .entry(user.details.user_id)
863                    .or_insert_with(|| mailchimp_id.clone());
864            }
865        }
866        for key in &["new_members", "updated_members"] {
867            if let Some(members) = response_data[key].as_array() {
868                for member in members {
869                    if let Some(user_id) = member["merge_fields"]["USERID"].as_str() {
870                        if let Ok(uuid) = uuid::Uuid::parse_str(user_id) {
871                            successfully_synced_user_ids.push(uuid);
872                        }
873                        if let Some(contact_id) = member["contact_id"].as_str() {
874                            user_id_contact_id_pairs
875                                .push((user_id.to_string(), contact_id.to_string()));
876                            if let Ok(uuid) = uuid::Uuid::parse_str(user_id) {
877                                chunk_mailchimp_id_by_user.insert(uuid, contact_id.to_string());
878                            }
879                            chunk_contact_count += 1;
880                        }
881                    }
882                }
883            }
884        }
885        if let Some(errors) = response_data["errors"].as_array()
886            && !errors.is_empty()
887        {
888            let sample: Vec<String> = errors
889                .iter()
890                .take(5)
891                .map(|e| {
892                    let email = e
893                        .get("email_address")
894                        .and_then(|v| v.as_str())
895                        .unwrap_or("?");
896                    let msg = e.get("error").and_then(|v| v.as_str()).unwrap_or_else(|| {
897                        e.get("message").and_then(|v| v.as_str()).unwrap_or("?")
898                    });
899                    format!("{}: {}", email, msg)
900                })
901                .collect();
902            warn!(
903                "Mailchimp batch subscribe chunk {}/{} returned {} error(s) (e.g. unsubscribed/rejected). Sample: {:?}",
904                chunk_index + 1,
905                total_chunks,
906                errors.len(),
907                sample
908            );
909        }
910        info!(
911            "Chunk {}/{}: {} contact_id(s) from new_members/updated_members",
912            chunk_index + 1,
913            total_chunks,
914            chunk_contact_count
915        );
916
917        let chunk_users: Vec<UserMarketingConsentWithDetails> =
918            chunk.iter().map(|user| user.details.clone()).collect();
919        match sync_policy_tags_for_users(
920            token,
921            &chunk_users,
922            &chunk_mailchimp_id_by_user,
923            Duration::from_secs(BATCH_POLL_TIMEOUT_SECS),
924            Duration::from_secs(BATCH_POLL_INTERVAL_SECS),
925        )
926        .await
927        {
928            Ok(results) => {
929                let (ok, failed): (Vec<_>, Vec<_>) = results.into_iter().partition(|r| r.success);
930                if !failed.is_empty() {
931                    let sample: Vec<String> = failed
932                        .iter()
933                        .take(5)
934                        .map(|r| {
935                            format!(
936                                "{}: {}",
937                                r.user_id,
938                                r.error.as_deref().unwrap_or("unknown error")
939                            )
940                        })
941                        .collect();
942                    warn!(
943                        "Policy tag sync for list '{}' chunk {}/{} had {} failure(s) out of {}. Sample: {:?}",
944                        token.mailchimp_mailing_list_id,
945                        chunk_index + 1,
946                        total_chunks,
947                        failed.len(),
948                        ok.len() + failed.len(),
949                        sample
950                    );
951                }
952            }
953            Err(e) => {
954                error!(
955                    "Failed to sync policy tags for list '{}' chunk {}/{}: {:?}",
956                    token.mailchimp_mailing_list_id,
957                    chunk_index + 1,
958                    total_chunks,
959                    e
960                );
961            }
962        }
963    }
964
965    let got_contact_id_set: std::collections::HashSet<Uuid> = user_id_contact_id_pairs
966        .iter()
967        .filter_map(|(uid, _)| Uuid::parse_str(uid).ok())
968        .collect();
969    let no_contact_id_user_ids: Vec<Uuid> = sent_user_ids
970        .iter()
971        .filter(|id| !got_contact_id_set.contains(id))
972        .copied()
973        .collect();
974
975    if !no_contact_id_user_ids.is_empty() {
976        let sample_len = no_contact_id_user_ids.len().min(10);
977        warn!(
978            "Mailchimp did not return contact_id for {} member(s) (likely unsubscribed, removed, or rejected). Marking synced_to_mailchimp_at to stop retry. First {} user_id(s): {:?}",
979            no_contact_id_user_ids.len(),
980            sample_len,
981            &no_contact_id_user_ids[..sample_len]
982        );
983        if let Err(e) = headless_lms_models::marketing_consents::update_synced_to_mailchimp_at_to_all_synced_users(
984            conn,
985            &no_contact_id_user_ids,
986        )
987        .await
988        {
989            error!(
990                "Failed to update synced_to_mailchimp_at for no-contact_id users: {:?}",
991                e
992            );
993        }
994    }
995
996    info!(
997        "Batch subscribe list '{}': sent {} member(s), got {} contact_id(s){}",
998        token.mailchimp_mailing_list_id,
999        sent_user_ids.len(),
1000        user_id_contact_id_pairs.len(),
1001        if no_contact_id_user_ids.is_empty() {
1002            String::new()
1003        } else {
1004            format!(
1005                ", {} without contact_id (marked synced to stop retry)",
1006                no_contact_id_user_ids.len()
1007            )
1008        }
1009    );
1010
1011    if !user_id_contact_id_pairs.is_empty() {
1012        headless_lms_models::marketing_consents::update_user_mailchimp_id_at_to_all_synced_users(
1013            conn,
1014            user_id_contact_id_pairs,
1015        )
1016        .await?;
1017    }
1018
1019    Ok(successfully_synced_user_ids)
1020}
1021
1022/// Sets or removes the "{slug}-completed" tag on Mailchimp members based on course completion. Skips users without user_mailchimp_id.
1023async fn sync_completed_tag_for_members(
1024    users_details: &[UserMarketingConsentWithDetails],
1025    successfully_synced_user_ids: &[Uuid],
1026    mailchimp_id_by_user: &std::collections::HashMap<Uuid, String>,
1027    slug: &str,
1028    token: &MarketingMailingListAccessToken,
1029) -> anyhow::Result<()> {
1030    let tag_name = format!("{}-completed", slug);
1031    let success_set: std::collections::HashSet<_> = successfully_synced_user_ids.iter().collect();
1032
1033    let mut operations = Vec::new();
1034    for user in users_details {
1035        if !success_set.contains(&user.user_id) {
1036            continue;
1037        }
1038        let Some(user_mailchimp_id) = mailchimp_id_by_user.get(&user.user_id) else {
1039            continue;
1040        };
1041        let status = if user.completed_course_at.is_some() {
1042            "active"
1043        } else {
1044            "inactive"
1045        };
1046        operations.push(MailchimpOperation {
1047            method: Method::POST,
1048            path: format!(
1049                "/lists/{}/members/{}/tags",
1050                token.mailchimp_mailing_list_id, user_mailchimp_id
1051            ),
1052            body: Some(json!({
1053                "tags": [
1054                    { "name": tag_name, "status": status }
1055                ]
1056            })),
1057            operation_id: Some(user.user_id.to_string()),
1058        });
1059    }
1060
1061    if operations.is_empty() {
1062        return Ok(());
1063    }
1064
1065    let ops_count = operations.len();
1066    info!(
1067        "Stage: completion tags (mark course completion). Preparing {} operation(s) for list '{}'",
1068        ops_count, token.mailchimp_mailing_list_id
1069    );
1070
1071    let timeout = Duration::from_secs(BATCH_POLL_TIMEOUT_SECS);
1072    let poll_interval = Duration::from_secs(BATCH_POLL_INTERVAL_SECS);
1073    let executor = MailchimpExecutor::new(timeout, poll_interval);
1074
1075    let start_time = Instant::now();
1076    let results = executor.execute(token, operations).await?;
1077    let failures: Vec<_> = results.iter().filter(|r| !r.is_success()).collect();
1078    if !failures.is_empty() {
1079        let sample: Vec<String> = failures
1080            .iter()
1081            .take(5)
1082            .map(|r| {
1083                format!(
1084                    "{}: {}",
1085                    r.operation_id.as_deref().unwrap_or("?"),
1086                    r.error.as_deref().unwrap_or("unknown error")
1087                )
1088            })
1089            .collect();
1090        warn!(
1091            "Completion tag sync for list '{}' had {} failure(s) out of {}. Sample: {:?}",
1092            token.mailchimp_mailing_list_id,
1093            failures.len(),
1094            results.len(),
1095            sample
1096        );
1097        return Err(anyhow::anyhow!("completion tag sync failed"));
1098    }
1099
1100    info!(
1101        "Completed sync of {} completion tags for list '{}' in {:.2}s",
1102        ops_count,
1103        token.mailchimp_mailing_list_id,
1104        start_time.elapsed().as_secs_f64()
1105    );
1106
1107    Ok(())
1108}
1109
1110/// Updates the email addresses of multiple users in a Mailchimp mailing list.
1111async fn update_emails_in_mailchimp(
1112    users: Vec<UserEmailSubscription>,
1113    list_id: &str,
1114    server_prefix: &str,
1115    access_token: &DbSecret,
1116) -> anyhow::Result<Vec<EmailSyncResult>> {
1117    let mut successfully_synced_users = Vec::new();
1118    let mut failed_user_ids = Vec::new();
1119
1120    for user in users {
1121        if let Some(ref user_mailchimp_id) = user.user_mailchimp_id {
1122            if let Some(ref status) = user.email_subscription_in_mailchimp
1123                && status != "subscribed"
1124            {
1125                continue; // Skip this user if they are not subscribed because Mailchimp only updates emails that are subscribed
1126            }
1127
1128            let url = format!(
1129                "https://{}.api.mailchimp.com/3.0/lists/{}/members/{}",
1130                server_prefix, list_id, user_mailchimp_id
1131            );
1132
1133            // Prepare the body for the PUT request
1134            let body = serde_json::json!({
1135                "email_address": &user.email,
1136                "status": &user.email_subscription_in_mailchimp,
1137            });
1138
1139            // Update the email
1140            let update_response = REQWEST_CLIENT
1141                .put(&url)
1142                .header(
1143                    "Authorization",
1144                    format!("apikey {}", access_token.expose_secret()),
1145                )
1146                .json(&body)
1147                .send()
1148                .await?;
1149
1150            if update_response.status().is_success() {
1151                successfully_synced_users.push(EmailSyncResult {
1152                    user_id: user.user_id,
1153                    user_mailchimp_id: user_mailchimp_id.clone(),
1154                });
1155            } else {
1156                failed_user_ids.push(user.user_id);
1157            }
1158        } else {
1159            continue;
1160        }
1161    }
1162
1163    if !failed_user_ids.is_empty() {
1164        info!("Failed to update the following users:");
1165        for user_id in &failed_user_ids {
1166            error!("User ID: {}", user_id);
1167        }
1168    }
1169
1170    Ok(successfully_synced_users)
1171}
1172
1173/// Fetches data from Mailchimp in chunks.
1174async fn fetch_unsubscribed_users_from_mailchimp_in_chunks(
1175    list_id: &str,
1176    server_prefix: &str,
1177    access_token: &DbSecret,
1178    chunk_size: usize,
1179) -> anyhow::Result<Vec<(String, String, String, String)>> {
1180    let mut all_data = Vec::new();
1181    let mut offset = 0;
1182
1183    loop {
1184        let url = format!(
1185            "https://{}.api.mailchimp.com/3.0/lists/{}/members?offset={}&count={}&fields=members.merge_fields,members.status,members.last_changed&status=unsubscribed,non-subscribed",
1186            server_prefix, list_id, offset, chunk_size
1187        );
1188
1189        let response = REQWEST_CLIENT
1190            .get(&url)
1191            .header(
1192                "Authorization",
1193                format!("apikey {}", access_token.expose_secret()),
1194            )
1195            .send()
1196            .await?
1197            .json::<serde_json::Value>()
1198            .await?;
1199
1200        let empty_vec = vec![];
1201        let members = response["members"].as_array().unwrap_or(&empty_vec);
1202        if members.is_empty() {
1203            break;
1204        }
1205
1206        for member in members {
1207            // Process the member, but only if necessary fields are present and valid
1208            if let (Some(status), Some(last_changed), Some(merge_fields)) = (
1209                member["status"].as_str(),
1210                member["last_changed"].as_str(),
1211                member["merge_fields"].as_object(),
1212            ) {
1213                // Ensure both USERID and LANGGRPID are present and valid
1214                if let (Some(user_id), Some(language_group_id)) = (
1215                    merge_fields.get("USERID").and_then(|v| v.as_str()),
1216                    merge_fields.get("LANGGRPID").and_then(|v| v.as_str()),
1217                ) {
1218                    // Avoid adding data if any field is missing or empty
1219                    if !user_id.is_empty() && !language_group_id.is_empty() {
1220                        all_data.push((
1221                            user_id.to_string(),
1222                            last_changed.to_string(),
1223                            language_group_id.to_string(),
1224                            status.to_string(),
1225                        ));
1226                    }
1227                }
1228            }
1229        }
1230
1231        // Check the pagination info from the response
1232        let total_items = response["total_items"].as_u64().unwrap_or(0) as usize;
1233        if offset + chunk_size >= total_items {
1234            break;
1235        }
1236
1237        offset += chunk_size;
1238    }
1239
1240    Ok(all_data)
1241}
1242
1243const BATCH_SIZE: usize = 1000;
1244
1245async fn process_unsubscribed_users_from_mailchimp(
1246    conn: &mut PgConnection,
1247    mailchimp_data: Vec<(String, String, String, String)>,
1248) -> anyhow::Result<()> {
1249    let total_records = mailchimp_data.len();
1250    let total_chunks = total_records.div_ceil(BATCH_SIZE);
1251
1252    for (chunk_num, chunk) in mailchimp_data.chunks(BATCH_SIZE).enumerate() {
1253        if chunk.is_empty() {
1254            continue;
1255        }
1256
1257        if let Err(e) = headless_lms_models::marketing_consents::update_unsubscribed_users_from_mailchimp_in_bulk(
1258            conn,
1259            chunk.to_vec(),
1260        )
1261        .await
1262        {
1263            error!(
1264                "Error while processing chunk {}/{}: {}",
1265                chunk_num + 1,
1266                total_chunks,
1267                e
1268            );
1269        }
1270    }
1271
1272    Ok(())
1273}