headless_lms_server/programs/
mailchimp_syncer.rs

1use crate::setup_tracing;
2use dotenv::dotenv;
3use headless_lms_models::marketing_consents::MarketingMailingListAccessToken;
4use headless_lms_models::marketing_consents::UserEmailSubscription;
5use headless_lms_models::marketing_consents::UserMarketingConsentWithDetails;
6use headless_lms_utils::http::REQWEST_CLIENT;
7use serde::Deserialize;
8use serde_json::json;
9use sqlx::{PgConnection, PgPool};
10use std::{
11    env,
12    time::{Duration, Instant},
13};
14use uuid::Uuid;
15
16#[derive(Debug, Deserialize)]
17struct MailchimpField {
18    field_id: String,
19    field_name: String,
20}
21
22#[derive(Debug)]
23struct FieldSchema {
24    tag: &'static str,
25    name: &'static str,
26    default_value: &'static str,
27}
28
29const REQUIRED_FIELDS: &[FieldSchema] = &[
30    FieldSchema {
31        tag: "FNAME",
32        name: "First Name",
33        default_value: "",
34    },
35    FieldSchema {
36        tag: "LNAME",
37        name: "Last Name",
38        default_value: "",
39    },
40    FieldSchema {
41        tag: "MARKETING",
42        name: "Accepts Marketing",
43        default_value: "disallowed",
44    },
45    FieldSchema {
46        tag: "LOCALE",
47        name: "Locale",
48        default_value: "en",
49    },
50    FieldSchema {
51        tag: "GRADUATED",
52        name: "Graduated",
53        default_value: "",
54    },
55    FieldSchema {
56        tag: "COURSEID",
57        name: "Course ID",
58        default_value: "",
59    },
60    FieldSchema {
61        tag: "LANGGRPID",
62        name: "Course language Group ID",
63        default_value: "",
64    },
65    FieldSchema {
66        tag: "USERID",
67        name: "User ID",
68        default_value: "",
69    },
70    FieldSchema {
71        tag: "RESEARCH",
72        name: "Research consent",
73        default_value: "false",
74    },
75];
76
77/// These fields are excluded from removing all fields that are not in the schema
78const FIELDS_EXCLUDED_FROM_REMOVING: &[&str] = &["PHONE", "PACE", "COUNTRY", "MMERGE9"];
79const REMOVE_UNSUPPORTED_FIELDS: bool = false;
80const PROCESS_UNSUBSCRIBES_INTERVAL_SECS: u64 = 10_800;
81
82const SYNC_INTERVAL_SECS: u64 = 10;
83const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
84
85/// The main function that initializes environment variables, config, and sync process.
86pub async fn main() -> anyhow::Result<()> {
87    initialize_environment()?;
88
89    let config = initialize_configuration().await?;
90
91    let db_pool = initialize_database_pool(&config.database_url).await?;
92    let mut conn = db_pool.acquire().await?;
93
94    let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
95    let mut ticks = 0;
96
97    let access_tokens =
98        headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
99            &mut conn,
100        )
101        .await?;
102
103    // Iterate through access tokens and ensure Mailchimp schema is set up
104    for token in &access_tokens {
105        if let Err(e) = ensure_mailchimp_schema(
106            &token.mailchimp_mailing_list_id,
107            &token.server_prefix,
108            &token.access_token,
109        )
110        .await
111        {
112            error!(
113                "Failed to set up Mailchimp schema for list '{}': {:?}",
114                token.mailchimp_mailing_list_id, e
115            );
116            return Err(e);
117        }
118    }
119
120    info!("Starting mailchimp syncer.");
121
122    let mut last_time_unsubscribes_processed = Instant::now();
123    let mut last_time_tags_synced = Instant::now();
124
125    loop {
126        interval.tick().await;
127        ticks += 1;
128
129        if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
130            ticks = 0;
131            info!("Still syncing.");
132        }
133        let mut process_unsubscribes = false;
134        if last_time_unsubscribes_processed.elapsed().as_secs()
135            >= PROCESS_UNSUBSCRIBES_INTERVAL_SECS
136        {
137            process_unsubscribes = true;
138            last_time_unsubscribes_processed = Instant::now();
139        };
140
141        // Check and sync tags for this access token once every hour
142        if last_time_tags_synced.elapsed().as_secs() >= 3600 {
143            info!("Syncing tags from Mailchimp...");
144            for token in &access_tokens {
145                if let Err(e) = sync_tags_from_mailchimp(
146                    &mut conn,
147                    &token.mailchimp_mailing_list_id,
148                    &token.access_token,
149                    &token.server_prefix,
150                    token.id,
151                    token.course_language_group_id,
152                )
153                .await
154                {
155                    error!(
156                        "Failed to sync tags for list '{}': {:?}",
157                        token.mailchimp_mailing_list_id, e
158                    );
159                }
160            }
161            last_time_tags_synced = Instant::now();
162        }
163
164        if let Err(e) = sync_contacts(&mut conn, &config, process_unsubscribes).await {
165            error!("Error during synchronization: {:?}", e);
166            if let Ok(sqlx::Error::Io(..)) = e.downcast::<sqlx::Error>() {
167                // this usually happens if the database is reset while running bin/dev etc.
168                info!("syncer may have lost its connection to the db, trying to reconnect");
169                conn = db_pool.acquire().await?;
170            }
171        }
172    }
173}
174
175/// Initializes environment variables, logging, and tracing setup.
176fn initialize_environment() -> anyhow::Result<()> {
177    // TODO: Audit that the environment access only happens in single-threaded code.
178    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
179    dotenv().ok();
180    setup_tracing()?;
181    Ok(())
182}
183
184/// Structure to hold the configuration settings, such as the database URL.
185struct SyncerConfig {
186    database_url: String,
187}
188
189/// Initializes and returns configuration settings (database URL).
190async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
191    let database_url = env::var("DATABASE_URL")
192        .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
193
194    Ok(SyncerConfig { database_url })
195}
196
197/// Initializes the PostgreSQL connection pool from the provided database URL.
198async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
199    PgPool::connect(database_url).await.map_err(|e| {
200        anyhow::anyhow!(
201            "Failed to connect to the database at {}: {:?}",
202            database_url,
203            e
204        )
205    })
206}
207
208/// Ensures the Mailchimp schema is up to date, adding required fields and removing any extra ones.
209async fn ensure_mailchimp_schema(
210    list_id: &str,
211    server_prefix: &str,
212    access_token: &str,
213) -> anyhow::Result<()> {
214    let existing_fields =
215        fetch_current_mailchimp_fields(list_id, server_prefix, access_token).await?;
216
217    if REMOVE_UNSUPPORTED_FIELDS {
218        // Remove extra fields not in REQUIRED_FIELDS or FIELDS_EXCLUDED_FROM_REMOVING
219        for field in existing_fields.iter() {
220            if !REQUIRED_FIELDS
221                .iter()
222                .any(|r| r.tag == field.field_name.as_str())
223                && !FIELDS_EXCLUDED_FROM_REMOVING.contains(&field.field_name.as_str())
224            {
225                match remove_field_from_mailchimp(
226                    list_id,
227                    &field.field_id,
228                    server_prefix,
229                    access_token,
230                )
231                .await
232                {
233                    Err(e) => {
234                        warn!("Could not remove field '{}': {}", field.field_name, e);
235                    }
236                    _ => {
237                        info!("Removed field '{}'", field.field_name);
238                    }
239                }
240            }
241        }
242    }
243
244    // Add any required fields that are missing
245    for required_field in REQUIRED_FIELDS.iter() {
246        if !existing_fields
247            .iter()
248            .any(|f| f.field_name == required_field.tag)
249        {
250            match add_field_to_mailchimp(list_id, required_field, server_prefix, access_token).await
251            {
252                Err(e) => {
253                    warn!(
254                        "Failed to add required field '{}': {}",
255                        required_field.name, e
256                    );
257                }
258                _ => {
259                    info!(
260                        "Successfully added required field '{}'",
261                        required_field.name
262                    );
263                }
264            }
265        } else {
266            info!(
267                "Field '{}' already exists, skipping addition.",
268                required_field.name
269            );
270        }
271    }
272
273    Ok(())
274}
275
276/// Fetches the current merge fields from the Mailchimp list schema.
277async fn fetch_current_mailchimp_fields(
278    list_id: &str,
279    server_prefix: &str,
280    access_token: &str,
281) -> Result<Vec<MailchimpField>, anyhow::Error> {
282    let url = format!(
283        "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
284        server_prefix, list_id
285    );
286
287    let response = REQWEST_CLIENT
288        .get(&url)
289        .header("Authorization", format!("apikey {}", access_token))
290        .send()
291        .await?;
292
293    if response.status().is_success() {
294        let json = response.json::<serde_json::Value>().await?;
295
296        let fields: Vec<MailchimpField> = json["merge_fields"]
297            .as_array()
298            .unwrap_or(&vec![])
299            .iter()
300            .filter_map(|field| {
301                let field_id = field["merge_id"].as_u64();
302                let field_name = field["tag"].as_str();
303
304                if let (Some(field_id), Some(field_name)) = (field_id, field_name) {
305                    Some(MailchimpField {
306                        field_id: field_id.to_string(),
307                        field_name: field_name.to_string(),
308                    })
309                } else {
310                    None
311                }
312            })
313            .collect();
314
315        Ok(fields)
316    } else {
317        let error_text = response.text().await?;
318        error!("Error fetching merge fields: {}", error_text);
319        Err(anyhow::anyhow!("Failed to fetch current Mailchimp fields."))
320    }
321}
322
323/// Adds a new merge field to the Mailchimp list.
324async fn add_field_to_mailchimp(
325    list_id: &str,
326    field_schema: &FieldSchema,
327    server_prefix: &str,
328    access_token: &str,
329) -> anyhow::Result<()> {
330    let url = format!(
331        "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
332        server_prefix, list_id
333    );
334
335    let body = json!({
336        "tag": field_schema.tag,
337        "name": field_schema.name,
338        "type": "text",
339        "default_value": field_schema.default_value,
340    });
341
342    let response = REQWEST_CLIENT
343        .post(&url)
344        .header("Authorization", format!("apikey {}", access_token))
345        .json(&body)
346        .send()
347        .await?;
348
349    if response.status().is_success() {
350        Ok(())
351    } else {
352        let status = response.status();
353        let error_text = response
354            .text()
355            .await
356            .unwrap_or_else(|_| "No additional error info.".to_string());
357        Err(anyhow::anyhow!(
358            "Failed to add field to Mailchimp. Status: {}. Error: {}",
359            status,
360            error_text
361        ))
362    }
363}
364
365/// Removes a merge field from the Mailchimp list by with a field ID.
366async fn remove_field_from_mailchimp(
367    list_id: &str,
368    field_id: &str,
369    server_prefix: &str,
370    access_token: &str,
371) -> anyhow::Result<()> {
372    let url = format!(
373        "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields/{}",
374        server_prefix, list_id, field_id
375    );
376
377    let response = REQWEST_CLIENT
378        .delete(&url)
379        .header("Authorization", format!("apikey {}", access_token))
380        .send()
381        .await?;
382
383    if response.status().is_success() {
384        Ok(())
385    } else {
386        let status = response.status();
387        let error_text = response
388            .text()
389            .await
390            .unwrap_or_else(|_| "No additional error info.".to_string());
391        Err(anyhow::anyhow!(
392            "Failed to remove field from Mailchimp. Status: {}. Error: {}",
393            status,
394            error_text
395        ))
396    }
397}
398
399/// Fetch tags from mailchimp and sync the changes to the database
400pub async fn sync_tags_from_mailchimp(
401    conn: &mut PgConnection,
402    list_id: &str,
403    access_token: &str,
404    server_prefix: &str,
405    marketing_mailing_list_access_token_id: Uuid,
406    course_language_group_id: Uuid,
407) -> anyhow::Result<()> {
408    let url = format!(
409        "https://{}.api.mailchimp.com/3.0/lists/{}/tag-search",
410        server_prefix, list_id
411    );
412
413    let response = REQWEST_CLIENT
414        .get(&url)
415        .header("Authorization", format!("apikey {}", access_token))
416        .send()
417        .await?;
418
419    // Extract tags from the response
420    let response_json = response.json::<serde_json::Value>().await?;
421
422    let mailchimp_tags = match response_json.get("tags") {
423        Some(tags) if tags.is_array() => tags
424            .as_array()
425            .unwrap()
426            .iter()
427            .filter_map(|tag| {
428                let name = tag.get("name")?.as_str()?.to_string();
429
430                let id = match tag.get("id") {
431                    Some(serde_json::Value::Number(num)) => num.to_string(),
432                    Some(serde_json::Value::String(str_id)) => str_id.clone(),
433                    _ => return None,
434                };
435
436                Some((id, name))
437            })
438            .collect::<Vec<(String, String)>>(),
439        _ => {
440            warn!("No tags found for list '{}', skipping sync.", list_id);
441            return Ok(());
442        }
443    };
444
445    // Fetch the current tags from the database
446    let db_tags = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
447            conn,
448            course_language_group_id,
449            marketing_mailing_list_access_token_id,
450        )
451        .await?;
452
453    // Check if any tags from Mailchimp are renamed
454    for (tag_id, tag_name) in &mailchimp_tags {
455        if let Some(db_tag) = db_tags.iter().find(|db_tag| {
456            db_tag.get("id").and_then(|v| v.as_str()).map(|v| v.trim()) == Some(tag_id.trim())
457        }) {
458            let db_tag_name = db_tag
459                .get("tag_name")
460                .and_then(|v| v.as_str())
461                .unwrap_or_default();
462            if db_tag_name != tag_name {
463                headless_lms_models::marketing_consents::upsert_tag(
464                    conn,
465                    course_language_group_id,
466                    marketing_mailing_list_access_token_id,
467                    tag_id.clone(),
468                    tag_name.clone(),
469                )
470                .await?;
471            }
472        }
473    }
474
475    // Check if any tags in the database have been removed via Mailchimp
476    for db_tag in db_tags.iter() {
477        let db_tag_id = db_tag
478            .get("id")
479            .and_then(|v| v.as_str())
480            .map(|v| v.trim().to_string())
481            .unwrap_or_default();
482
483        // Check if this tag exists in the Mailchimp tags
484        if !mailchimp_tags
485            .iter()
486            .any(|(tag_id, _)| tag_id.trim() == db_tag_id.trim())
487        {
488            if db_tag_id.is_empty() {
489                warn!("Skipping tag deletion due to missing ID: {:?}", db_tag);
490                continue;
491            }
492            headless_lms_models::marketing_consents::delete_tag(
493                conn,
494                db_tag_id.clone(),
495                course_language_group_id,
496            )
497            .await?;
498        }
499    }
500
501    Ok(())
502}
503
504/// Synchronizes the user contacts with Mailchimp.
505/// Added a boolean flag to determine whether to process unsubscribes.
506async fn sync_contacts(
507    conn: &mut PgConnection,
508    _config: &SyncerConfig,
509    process_unsubscribes: bool,
510) -> anyhow::Result<()> {
511    let access_tokens =
512        headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
513            conn,
514        )
515        .await?;
516
517    let mut successfully_synced_user_ids = Vec::new();
518
519    // Iterate through tokens and fetch and send user details to Mailchimp
520    for token in access_tokens {
521        // Fetch all users from Mailchimp and sync possible changes locally
522        if process_unsubscribes {
523            let mailchimp_data = fetch_unsubscribed_users_from_mailchimp_in_chunks(
524                &token.mailchimp_mailing_list_id,
525                &token.server_prefix,
526                &token.access_token,
527                1000,
528            )
529            .await?;
530
531            info!(
532                "Processing Mailchimp data for list: {}",
533                token.mailchimp_mailing_list_id
534            );
535
536            process_unsubscribed_users_from_mailchimp(conn, mailchimp_data).await?;
537        }
538
539        // Fetch unsynced emails and update them in Mailchimp
540        let users_with_unsynced_emails =
541            headless_lms_models::marketing_consents::fetch_all_unsynced_updated_emails(
542                conn,
543                token.course_language_group_id,
544            )
545            .await?;
546
547        info!(
548            "Found {} unsynced user email(s) for course language group: {}",
549            users_with_unsynced_emails.len(),
550            token.course_language_group_id
551        );
552
553        if !users_with_unsynced_emails.is_empty() {
554            let email_synced_user_ids = update_emails_in_mailchimp(
555                users_with_unsynced_emails,
556                &token.mailchimp_mailing_list_id,
557                &token.server_prefix,
558                &token.access_token,
559            )
560            .await?;
561
562            // Store the successfully synced user IDs from updating emails
563            successfully_synced_user_ids.extend(email_synced_user_ids);
564        }
565
566        let tag_objects = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(conn, token.course_language_group_id, token.id).await?;
567
568        // Fetch unsynced user consents and update them in Mailchimp
569        let unsynced_users_details =
570            headless_lms_models::marketing_consents::fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
571                conn,
572                token.course_language_group_id,
573            )
574            .await?;
575
576        info!(
577            "Found {} unsynced user consent(s) for course language group: {}",
578            unsynced_users_details.len(),
579            token.course_language_group_id
580        );
581
582        if !unsynced_users_details.is_empty() {
583            let consent_synced_user_ids =
584                send_users_to_mailchimp(conn, token, unsynced_users_details, tag_objects).await?;
585
586            // Store the successfully synced user IDs from syncing user consents
587            successfully_synced_user_ids.extend(consent_synced_user_ids);
588        }
589    }
590
591    // If there are any successfully synced users, update the database to mark them as synced
592    if !successfully_synced_user_ids.is_empty() {
593        match headless_lms_models::marketing_consents::update_synced_to_mailchimp_at_to_all_synced_users(
594        conn,
595        &successfully_synced_user_ids,
596    )
597    .await
598    {
599        Ok(_) => {
600            info!(
601                "Successfully updated synced status for {} users.",
602                successfully_synced_user_ids.len()
603            );
604        }
605        Err(e) => {
606            error!(
607                "Failed to update synced status for {} users: {:?}",
608                successfully_synced_user_ids.len(),
609                e
610            );
611        }
612    }
613    }
614
615    Ok(())
616}
617
618/// Sends a batch of users to Mailchimp for synchronization.
619pub async fn send_users_to_mailchimp(
620    conn: &mut PgConnection,
621    token: MarketingMailingListAccessToken,
622    users_details: Vec<UserMarketingConsentWithDetails>,
623    tag_objects: Vec<serde_json::Value>,
624) -> anyhow::Result<Vec<Uuid>> {
625    let mut users_data_in_json = vec![];
626    let mut user_ids = vec![];
627    let mut successfully_synced_user_ids = Vec::new();
628    let mut user_id_contact_id_pairs = Vec::new();
629
630    // Prepare each user's data for Mailchimp
631    for user in &users_details {
632        // Check user has given permission to send data to mailchimp
633        if let Some(ref subscription) = user.email_subscription_in_mailchimp {
634            if subscription == "subscribed" {
635                let user_details = json!({
636                    "email_address": user.email,
637                    "status": user.email_subscription_in_mailchimp,
638                    "merge_fields": {
639                        "FNAME": user.first_name.clone().unwrap_or("".to_string()),
640                        "LNAME": user.last_name.clone().unwrap_or("".to_string()),
641                        "MARKETING": if user.consent { "allowed" } else { "disallowed" },
642                        "LOCALE": user.locale,
643                        // If the course is not completed, we pass an empty string to mailchimp to remove the value
644                        "GRADUATED": user.completed_course_at.map(|cca| cca.to_rfc3339()).unwrap_or("".to_string()),
645                        "USERID": user.user_id,
646                        "COURSEID": user.course_id,
647                        "LANGGRPID": user.course_language_group_id,
648                        "RESEARCH" : if user.research_consent.unwrap_or(false) { "allowed" } else { "disallowed" },
649                        "COUNTRY" : user.country.clone().unwrap_or("".to_string()),
650                    },
651                   "tags": tag_objects.iter().map(|tag| tag["name"].clone()).collect::<Vec<_>>()
652                });
653                users_data_in_json.push(user_details);
654                user_ids.push(user.id);
655            }
656        }
657    }
658
659    let batch_request = json!({
660        "members": users_data_in_json,
661        "update_existing":true
662    });
663
664    let url = format!(
665        "https://{}.api.mailchimp.com/3.0/lists/{}",
666        token.server_prefix, token.mailchimp_mailing_list_id
667    );
668
669    // Check if batch is empty before sending
670    if users_data_in_json.is_empty() {
671        info!("No new users to sync.");
672        return Ok(vec![]);
673    }
674
675    // Send the batch request to Mailchimp
676    let response = REQWEST_CLIENT
677        .post(&url)
678        .header("Content-Type", "application/json")
679        .header("Authorization", format!("apikey {}", token.access_token))
680        .json(&batch_request)
681        .send()
682        .await?;
683
684    if response.status().is_success() {
685        let response_data: serde_json::Value = response.json().await?;
686
687        // Iterate over both new_members and updated_members to extract user_ids and contact_ids
688        for key in &["new_members", "updated_members"] {
689            if let Some(members) = response_data[key].as_array() {
690                for member in members {
691                    if let Some(user_id) = member["merge_fields"]["USERID"].as_str() {
692                        if let Ok(uuid) = uuid::Uuid::parse_str(user_id) {
693                            successfully_synced_user_ids.push(uuid);
694                        }
695                        if let Some(contact_id) = member["contact_id"].as_str() {
696                            user_id_contact_id_pairs
697                                .push((user_id.to_string(), contact_id.to_string()));
698                        }
699                    }
700                }
701            }
702        }
703        // Update the users contact_id from Mailchimp to the database as user_mailchimp_id
704        headless_lms_models::marketing_consents::update_user_mailchimp_id_at_to_all_synced_users(
705            conn,
706            user_id_contact_id_pairs,
707        )
708        .await?;
709
710        // Return the list of successfully synced user_ids
711        Ok(successfully_synced_user_ids)
712    } else {
713        let status = response.status();
714        let error_text = response.text().await?;
715        Err(anyhow::anyhow!(
716            "Error syncing users to Mailchimp. Status: {}. Error: {}",
717            status,
718            error_text
719        ))
720    }
721}
722
723/// Updates the email addresses of multiple users in a Mailchimp mailing list.
724async fn update_emails_in_mailchimp(
725    users: Vec<UserEmailSubscription>,
726    list_id: &str,
727    server_prefix: &str,
728    access_token: &str,
729) -> anyhow::Result<Vec<Uuid>> {
730    let mut successfully_synced_user_ids = Vec::new();
731    let mut failed_user_ids = Vec::new();
732
733    for user in users {
734        if let Some(ref user_mailchimp_id) = user.user_mailchimp_id {
735            if let Some(ref status) = user.email_subscription_in_mailchimp {
736                if status != "subscribed" {
737                    continue; // Skip this user if they are not subscribed because Mailchimp only updates emails that are subscribed
738                }
739            }
740
741            let url = format!(
742                "https://{}.api.mailchimp.com/3.0/lists/{}/members/{}",
743                server_prefix, list_id, user_mailchimp_id
744            );
745
746            // Prepare the body for the PUT request
747            let body = serde_json::json!({
748                "email_address": &user.email,
749                "status": &user.email_subscription_in_mailchimp,
750            });
751
752            // Update the email
753            let update_response = REQWEST_CLIENT
754                .put(&url)
755                .header("Authorization", format!("apikey {}", access_token))
756                .json(&body)
757                .send()
758                .await?;
759
760            if update_response.status().is_success() {
761                successfully_synced_user_ids.push(user.user_id);
762            } else {
763                failed_user_ids.push(user.user_id);
764            }
765        } else {
766            continue;
767        }
768    }
769
770    if !failed_user_ids.is_empty() {
771        info!("Failed to update the following users:");
772        for user_id in &failed_user_ids {
773            error!("User ID: {}", user_id);
774        }
775    }
776
777    Ok(successfully_synced_user_ids)
778}
779
780/// Fetches data from Mailchimp in chunks.
781async fn fetch_unsubscribed_users_from_mailchimp_in_chunks(
782    list_id: &str,
783    server_prefix: &str,
784    access_token: &str,
785    chunk_size: usize,
786) -> anyhow::Result<Vec<(String, String, String, String)>> {
787    let mut all_data = Vec::new();
788    let mut offset = 0;
789
790    loop {
791        let url = format!(
792            "https://{}.api.mailchimp.com/3.0/lists/{}/members?offset={}&count={}&fields=members.merge_fields,members.status,members.last_changed&status=unsubscribed,non-subscribed",
793            server_prefix, list_id, offset, chunk_size
794        );
795
796        let response = REQWEST_CLIENT
797            .get(&url)
798            .header("Authorization", format!("apikey {}", access_token))
799            .send()
800            .await?
801            .json::<serde_json::Value>()
802            .await?;
803
804        let empty_vec = vec![];
805        let members = response["members"].as_array().unwrap_or(&empty_vec);
806        if members.is_empty() {
807            break;
808        }
809
810        for member in members {
811            // Process the member, but only if necessary fields are present and valid
812            if let (Some(status), Some(last_changed), Some(merge_fields)) = (
813                member["status"].as_str(),
814                member["last_changed"].as_str(),
815                member["merge_fields"].as_object(),
816            ) {
817                // Ensure both USERID and LANGGRPID are present and valid
818                if let (Some(user_id), Some(language_group_id)) = (
819                    merge_fields.get("USERID").and_then(|v| v.as_str()),
820                    merge_fields.get("LANGGRPID").and_then(|v| v.as_str()),
821                ) {
822                    // Avoid adding data if any field is missing or empty
823                    if !user_id.is_empty() && !language_group_id.is_empty() {
824                        all_data.push((
825                            user_id.to_string(),
826                            last_changed.to_string(),
827                            language_group_id.to_string(),
828                            status.to_string(),
829                        ));
830                    }
831                }
832            }
833        }
834
835        // Check the pagination info from the response
836        let total_items = response["total_items"].as_u64().unwrap_or(0) as usize;
837        if offset + chunk_size >= total_items {
838            break;
839        }
840
841        offset += chunk_size;
842    }
843
844    Ok(all_data)
845}
846
847const BATCH_SIZE: usize = 1000;
848
849async fn process_unsubscribed_users_from_mailchimp(
850    conn: &mut PgConnection,
851    mailchimp_data: Vec<(String, String, String, String)>,
852) -> anyhow::Result<()> {
853    // Log the total size of the Mailchimp data
854    let total_records = mailchimp_data.len();
855
856    for chunk in mailchimp_data.chunks(BATCH_SIZE) {
857        if chunk.is_empty() {
858            continue;
859        }
860
861        // Attempt to process the current chunk
862        if let Err(e) = headless_lms_models::marketing_consents::update_unsubscribed_users_from_mailchimp_in_bulk(
863            conn,
864            chunk.to_vec(),
865        )
866        .await
867        {
868            error!(
869                "Error while processing chunk {}/{}: ",
870                total_records.div_ceil(BATCH_SIZE),
871                e
872            );
873        }
874    }
875
876    Ok(())
877}