headless_lms_server/programs/
mailchimp_syncer.rs

1use crate::setup_tracing;
2use dotenv::dotenv;
3use headless_lms_models::marketing_consents::MarketingMailingListAccessToken;
4use headless_lms_models::marketing_consents::UserEmailSubscription;
5use headless_lms_models::marketing_consents::UserMarketingConsentWithDetails;
6use headless_lms_utils::http::REQWEST_CLIENT;
7use serde::Deserialize;
8use serde_json::json;
9use sqlx::{PgConnection, PgPool};
10use std::{
11    env,
12    time::{Duration, Instant},
13};
14use uuid::Uuid;
15
16#[derive(Debug, Deserialize)]
17struct MailchimpField {
18    field_id: String,
19    field_name: String,
20}
21
22#[derive(Debug)]
23struct FieldSchema {
24    tag: &'static str,
25    name: &'static str,
26    default_value: &'static str,
27}
28
29const REQUIRED_FIELDS: &[FieldSchema] = &[
30    FieldSchema {
31        tag: "FNAME",
32        name: "First Name",
33        default_value: "",
34    },
35    FieldSchema {
36        tag: "LNAME",
37        name: "Last Name",
38        default_value: "",
39    },
40    FieldSchema {
41        tag: "MARKETING",
42        name: "Accepts Marketing",
43        default_value: "disallowed",
44    },
45    FieldSchema {
46        tag: "LOCALE",
47        name: "Locale",
48        default_value: "en",
49    },
50    FieldSchema {
51        tag: "GRADUATED",
52        name: "Graduated",
53        default_value: "",
54    },
55    FieldSchema {
56        tag: "COURSEID",
57        name: "Course ID",
58        default_value: "",
59    },
60    FieldSchema {
61        tag: "LANGGRPID",
62        name: "Course language Group ID",
63        default_value: "",
64    },
65    FieldSchema {
66        tag: "USERID",
67        name: "User ID",
68        default_value: "",
69    },
70    FieldSchema {
71        tag: "RESEARCH",
72        name: "Research consent",
73        default_value: "false",
74    },
75];
76
77/// These fields are excluded from removing all fields that are not in the schema
78const FIELDS_EXCLUDED_FROM_REMOVING: &[&str] = &["PHONE", "PACE", "COUNTRY", "MMERGE9"];
79const REMOVE_UNSUPPORTED_FIELDS: bool = false;
80const PROCESS_UNSUBSCRIBES_INTERVAL_SECS: u64 = 10_800;
81
82const SYNC_INTERVAL_SECS: u64 = 10;
83const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
84
85/// The main function that initializes environment variables, config, and sync process.
86pub async fn main() -> anyhow::Result<()> {
87    initialize_environment()?;
88
89    let config = initialize_configuration().await?;
90
91    let db_pool = initialize_database_pool(&config.database_url).await?;
92    let mut conn = db_pool.acquire().await?;
93
94    let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
95    let mut ticks = 0;
96
97    let access_tokens =
98        headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
99            &mut conn,
100        )
101        .await?;
102
103    // Iterate through access tokens and ensure Mailchimp schema is set up
104    for token in &access_tokens {
105        if let Err(e) = ensure_mailchimp_schema(
106            &token.mailchimp_mailing_list_id,
107            &token.server_prefix,
108            &token.access_token,
109        )
110        .await
111        {
112            error!(
113                "Failed to set up Mailchimp schema for list '{}': {:?}",
114                token.mailchimp_mailing_list_id, e
115            );
116            return Err(e);
117        }
118    }
119
120    info!("Starting mailchimp syncer.");
121
122    let mut last_time_unsubscribes_processed = Instant::now();
123    let mut last_time_tags_synced = Instant::now();
124
125    loop {
126        interval.tick().await;
127        ticks += 1;
128
129        if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
130            ticks = 0;
131            info!("Still syncing.");
132        }
133        let mut process_unsubscribes = false;
134        if last_time_unsubscribes_processed.elapsed().as_secs()
135            >= PROCESS_UNSUBSCRIBES_INTERVAL_SECS
136        {
137            process_unsubscribes = true;
138            last_time_unsubscribes_processed = Instant::now();
139        };
140
141        // Check and sync tags for this access token once every hour
142        if last_time_tags_synced.elapsed().as_secs() >= 3600 {
143            info!("Syncing tags from Mailchimp...");
144            for token in &access_tokens {
145                if let Err(e) = sync_tags_from_mailchimp(
146                    &mut conn,
147                    &token.mailchimp_mailing_list_id,
148                    &token.access_token,
149                    &token.server_prefix,
150                    token.id,
151                    token.course_language_group_id,
152                )
153                .await
154                {
155                    error!(
156                        "Failed to sync tags for list '{}': {:?}",
157                        token.mailchimp_mailing_list_id, e
158                    );
159                }
160            }
161            last_time_tags_synced = Instant::now();
162        }
163
164        if let Err(e) = sync_contacts(&mut conn, &config, process_unsubscribes).await {
165            error!("Error during synchronization: {:?}", e);
166        }
167    }
168}
169
170/// Initializes environment variables, logging, and tracing setup.
171fn initialize_environment() -> anyhow::Result<()> {
172    // TODO: Audit that the environment access only happens in single-threaded code.
173    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
174    dotenv().ok();
175    setup_tracing()?;
176    Ok(())
177}
178
179/// Structure to hold the configuration settings, such as the database URL.
180struct SyncerConfig {
181    database_url: String,
182}
183
184/// Initializes and returns configuration settings (database URL).
185async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
186    let database_url = env::var("DATABASE_URL")
187        .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
188
189    Ok(SyncerConfig { database_url })
190}
191
192/// Initializes the PostgreSQL connection pool from the provided database URL.
193async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
194    PgPool::connect(database_url).await.map_err(|e| {
195        anyhow::anyhow!(
196            "Failed to connect to the database at {}: {:?}",
197            database_url,
198            e
199        )
200    })
201}
202
203/// Ensures the Mailchimp schema is up to date, adding required fields and removing any extra ones.
204async fn ensure_mailchimp_schema(
205    list_id: &str,
206    server_prefix: &str,
207    access_token: &str,
208) -> anyhow::Result<()> {
209    let existing_fields =
210        fetch_current_mailchimp_fields(list_id, server_prefix, access_token).await?;
211
212    if REMOVE_UNSUPPORTED_FIELDS {
213        // Remove extra fields not in REQUIRED_FIELDS or FIELDS_EXCLUDED_FROM_REMOVING
214        for field in existing_fields.iter() {
215            if !REQUIRED_FIELDS
216                .iter()
217                .any(|r| r.tag == field.field_name.as_str())
218                && !FIELDS_EXCLUDED_FROM_REMOVING.contains(&field.field_name.as_str())
219            {
220                match remove_field_from_mailchimp(
221                    list_id,
222                    &field.field_id,
223                    server_prefix,
224                    access_token,
225                )
226                .await
227                {
228                    Err(e) => {
229                        warn!("Could not remove field '{}': {}", field.field_name, e);
230                    }
231                    _ => {
232                        info!("Removed field '{}'", field.field_name);
233                    }
234                }
235            }
236        }
237    }
238
239    // Add any required fields that are missing
240    for required_field in REQUIRED_FIELDS.iter() {
241        if !existing_fields
242            .iter()
243            .any(|f| f.field_name == required_field.tag)
244        {
245            match add_field_to_mailchimp(list_id, required_field, server_prefix, access_token).await
246            {
247                Err(e) => {
248                    warn!(
249                        "Failed to add required field '{}': {}",
250                        required_field.name, e
251                    );
252                }
253                _ => {
254                    info!(
255                        "Successfully added required field '{}'",
256                        required_field.name
257                    );
258                }
259            }
260        } else {
261            info!(
262                "Field '{}' already exists, skipping addition.",
263                required_field.name
264            );
265        }
266    }
267
268    Ok(())
269}
270
271/// Fetches the current merge fields from the Mailchimp list schema.
272async fn fetch_current_mailchimp_fields(
273    list_id: &str,
274    server_prefix: &str,
275    access_token: &str,
276) -> Result<Vec<MailchimpField>, anyhow::Error> {
277    let url = format!(
278        "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
279        server_prefix, list_id
280    );
281
282    let response = REQWEST_CLIENT
283        .get(&url)
284        .header("Authorization", format!("apikey {}", access_token))
285        .send()
286        .await?;
287
288    if response.status().is_success() {
289        let json = response.json::<serde_json::Value>().await?;
290
291        let fields: Vec<MailchimpField> = json["merge_fields"]
292            .as_array()
293            .unwrap_or(&vec![])
294            .iter()
295            .filter_map(|field| {
296                let field_id = field["merge_id"].as_u64();
297                let field_name = field["tag"].as_str();
298
299                if let (Some(field_id), Some(field_name)) = (field_id, field_name) {
300                    Some(MailchimpField {
301                        field_id: field_id.to_string(),
302                        field_name: field_name.to_string(),
303                    })
304                } else {
305                    None
306                }
307            })
308            .collect();
309
310        Ok(fields)
311    } else {
312        let error_text = response.text().await?;
313        error!("Error fetching merge fields: {}", error_text);
314        Err(anyhow::anyhow!("Failed to fetch current Mailchimp fields."))
315    }
316}
317
318/// Adds a new merge field to the Mailchimp list.
319async fn add_field_to_mailchimp(
320    list_id: &str,
321    field_schema: &FieldSchema,
322    server_prefix: &str,
323    access_token: &str,
324) -> anyhow::Result<()> {
325    let url = format!(
326        "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
327        server_prefix, list_id
328    );
329
330    let body = json!({
331        "tag": field_schema.tag,
332        "name": field_schema.name,
333        "type": "text",
334        "default_value": field_schema.default_value,
335    });
336
337    let response = REQWEST_CLIENT
338        .post(&url)
339        .header("Authorization", format!("apikey {}", access_token))
340        .json(&body)
341        .send()
342        .await?;
343
344    if response.status().is_success() {
345        Ok(())
346    } else {
347        let status = response.status();
348        let error_text = response
349            .text()
350            .await
351            .unwrap_or_else(|_| "No additional error info.".to_string());
352        Err(anyhow::anyhow!(
353            "Failed to add field to Mailchimp. Status: {}. Error: {}",
354            status,
355            error_text
356        ))
357    }
358}
359
360/// Removes a merge field from the Mailchimp list by with a field ID.
361async fn remove_field_from_mailchimp(
362    list_id: &str,
363    field_id: &str,
364    server_prefix: &str,
365    access_token: &str,
366) -> anyhow::Result<()> {
367    let url = format!(
368        "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields/{}",
369        server_prefix, list_id, field_id
370    );
371
372    let response = REQWEST_CLIENT
373        .delete(&url)
374        .header("Authorization", format!("apikey {}", access_token))
375        .send()
376        .await?;
377
378    if response.status().is_success() {
379        Ok(())
380    } else {
381        let status = response.status();
382        let error_text = response
383            .text()
384            .await
385            .unwrap_or_else(|_| "No additional error info.".to_string());
386        Err(anyhow::anyhow!(
387            "Failed to remove field from Mailchimp. Status: {}. Error: {}",
388            status,
389            error_text
390        ))
391    }
392}
393
394/// Fetch tags from mailchimp and sync the changes to the database
395pub async fn sync_tags_from_mailchimp(
396    conn: &mut PgConnection,
397    list_id: &str,
398    access_token: &str,
399    server_prefix: &str,
400    marketing_mailing_list_access_token_id: Uuid,
401    course_language_group_id: Uuid,
402) -> anyhow::Result<()> {
403    let url = format!(
404        "https://{}.api.mailchimp.com/3.0/lists/{}/tag-search",
405        server_prefix, list_id
406    );
407
408    let response = REQWEST_CLIENT
409        .get(&url)
410        .header("Authorization", format!("apikey {}", access_token))
411        .send()
412        .await?;
413
414    // Extract tags from the response
415    let response_json = response.json::<serde_json::Value>().await?;
416
417    let mailchimp_tags = match response_json.get("tags") {
418        Some(tags) if tags.is_array() => tags
419            .as_array()
420            .unwrap()
421            .iter()
422            .filter_map(|tag| {
423                let name = tag.get("name")?.as_str()?.to_string();
424
425                let id = match tag.get("id") {
426                    Some(serde_json::Value::Number(num)) => num.to_string(),
427                    Some(serde_json::Value::String(str_id)) => str_id.clone(),
428                    _ => return None,
429                };
430
431                Some((id, name))
432            })
433            .collect::<Vec<(String, String)>>(),
434        _ => {
435            warn!("No tags found for list '{}', skipping sync.", list_id);
436            return Ok(());
437        }
438    };
439
440    // Fetch the current tags from the database
441    let db_tags = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
442            conn,
443            course_language_group_id,
444            marketing_mailing_list_access_token_id,
445        )
446        .await?;
447
448    // Check if any tags from Mailchimp are renamed
449    for (tag_id, tag_name) in &mailchimp_tags {
450        if let Some(db_tag) = db_tags.iter().find(|db_tag| {
451            db_tag.get("id").and_then(|v| v.as_str()).map(|v| v.trim()) == Some(tag_id.trim())
452        }) {
453            let db_tag_name = db_tag
454                .get("tag_name")
455                .and_then(|v| v.as_str())
456                .unwrap_or_default();
457            if db_tag_name != tag_name {
458                headless_lms_models::marketing_consents::upsert_tag(
459                    conn,
460                    course_language_group_id,
461                    marketing_mailing_list_access_token_id,
462                    tag_id.clone(),
463                    tag_name.clone(),
464                )
465                .await?;
466            }
467        }
468    }
469
470    // Check if any tags in the database have been removed via Mailchimp
471    for db_tag in db_tags.iter() {
472        let db_tag_id = db_tag
473            .get("id")
474            .and_then(|v| v.as_str())
475            .map(|v| v.trim().to_string())
476            .unwrap_or_default();
477
478        // Check if this tag exists in the Mailchimp tags
479        if !mailchimp_tags
480            .iter()
481            .any(|(tag_id, _)| tag_id.trim() == db_tag_id.trim())
482        {
483            if db_tag_id.is_empty() {
484                warn!("Skipping tag deletion due to missing ID: {:?}", db_tag);
485                continue;
486            }
487            headless_lms_models::marketing_consents::delete_tag(
488                conn,
489                db_tag_id.clone(),
490                course_language_group_id,
491            )
492            .await?;
493        }
494    }
495
496    Ok(())
497}
498
499/// Synchronizes the user contacts with Mailchimp.
500/// Added a boolean flag to determine whether to process unsubscribes.
501async fn sync_contacts(
502    conn: &mut PgConnection,
503    _config: &SyncerConfig,
504    process_unsubscribes: bool,
505) -> anyhow::Result<()> {
506    let access_tokens =
507        headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
508            conn,
509        )
510        .await?;
511
512    let mut successfully_synced_user_ids = Vec::new();
513
514    // Iterate through tokens and fetch and send user details to Mailchimp
515    for token in access_tokens {
516        // Fetch all users from Mailchimp and sync possible changes locally
517        if process_unsubscribes {
518            let mailchimp_data = fetch_unsubscribed_users_from_mailchimp_in_chunks(
519                &token.mailchimp_mailing_list_id,
520                &token.server_prefix,
521                &token.access_token,
522                1000,
523            )
524            .await?;
525
526            info!(
527                "Processing Mailchimp data for list: {}",
528                token.mailchimp_mailing_list_id
529            );
530
531            process_unsubscribed_users_from_mailchimp(conn, mailchimp_data).await?;
532        }
533
534        // Fetch unsynced emails and update them in Mailchimp
535        let users_with_unsynced_emails =
536            headless_lms_models::marketing_consents::fetch_all_unsynced_updated_emails(
537                conn,
538                token.course_language_group_id,
539            )
540            .await?;
541
542        info!(
543            "Found {} unsynced user email(s) for course language group: {}",
544            users_with_unsynced_emails.len(),
545            token.course_language_group_id
546        );
547
548        if !users_with_unsynced_emails.is_empty() {
549            let email_synced_user_ids = update_emails_in_mailchimp(
550                users_with_unsynced_emails,
551                &token.mailchimp_mailing_list_id,
552                &token.server_prefix,
553                &token.access_token,
554            )
555            .await?;
556
557            // Store the successfully synced user IDs from updating emails
558            successfully_synced_user_ids.extend(email_synced_user_ids);
559        }
560
561        let tag_objects = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(conn, token.course_language_group_id, token.id).await?;
562
563        // Fetch unsynced user consents and update them in Mailchimp
564        let unsynced_users_details =
565            headless_lms_models::marketing_consents::fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
566                conn,
567                token.course_language_group_id,
568            )
569            .await?;
570
571        info!(
572            "Found {} unsynced user consent(s) for course language group: {}",
573            unsynced_users_details.len(),
574            token.course_language_group_id
575        );
576
577        if !unsynced_users_details.is_empty() {
578            let consent_synced_user_ids =
579                send_users_to_mailchimp(conn, token, unsynced_users_details, tag_objects).await?;
580
581            // Store the successfully synced user IDs from syncing user consents
582            successfully_synced_user_ids.extend(consent_synced_user_ids);
583        }
584    }
585
586    // If there are any successfully synced users, update the database to mark them as synced
587    if !successfully_synced_user_ids.is_empty() {
588        match headless_lms_models::marketing_consents::update_synced_to_mailchimp_at_to_all_synced_users(
589        conn,
590        &successfully_synced_user_ids,
591    )
592    .await
593    {
594        Ok(_) => {
595            info!(
596                "Successfully updated synced status for {} users.",
597                successfully_synced_user_ids.len()
598            );
599        }
600        Err(e) => {
601            error!(
602                "Failed to update synced status for {} users: {:?}",
603                successfully_synced_user_ids.len(),
604                e
605            );
606        }
607    }
608    }
609
610    Ok(())
611}
612
613/// Sends a batch of users to Mailchimp for synchronization.
614pub async fn send_users_to_mailchimp(
615    conn: &mut PgConnection,
616    token: MarketingMailingListAccessToken,
617    users_details: Vec<UserMarketingConsentWithDetails>,
618    tag_objects: Vec<serde_json::Value>,
619) -> anyhow::Result<Vec<Uuid>> {
620    let mut users_data_in_json = vec![];
621    let mut user_ids = vec![];
622    let mut successfully_synced_user_ids = Vec::new();
623    let mut user_id_contact_id_pairs = Vec::new();
624
625    // Prepare each user's data for Mailchimp
626    for user in &users_details {
627        // Check user has given permission to send data to mailchimp
628        if let Some(ref subscription) = user.email_subscription_in_mailchimp {
629            if subscription == "subscribed" {
630                let user_details = json!({
631                    "email_address": user.email,
632                    "status": user.email_subscription_in_mailchimp,
633                    "merge_fields": {
634                        "FNAME": user.first_name.clone().unwrap_or("".to_string()),
635                        "LNAME": user.last_name.clone().unwrap_or("".to_string()),
636                        "MARKETING": if user.consent { "allowed" } else { "disallowed" },
637                        "LOCALE": user.locale,
638                        // If the course is not completed, we pass an empty string to mailchimp to remove the value
639                        "GRADUATED": user.completed_course_at.map(|cca| cca.to_rfc3339()).unwrap_or("".to_string()),
640                        "USERID": user.user_id,
641                        "COURSEID": user.course_id,
642                        "LANGGRPID": user.course_language_group_id,
643                        "RESEARCH" : if user.research_consent.unwrap_or(false) { "allowed" } else { "disallowed" },
644                        "COUNTRY" : user.country.clone().unwrap_or("".to_string()),
645                    },
646                   "tags": tag_objects.iter().map(|tag| tag["name"].clone()).collect::<Vec<_>>()
647                });
648                users_data_in_json.push(user_details);
649                user_ids.push(user.id);
650            }
651        }
652    }
653
654    let batch_request = json!({
655        "members": users_data_in_json,
656        "update_existing":true
657    });
658
659    let url = format!(
660        "https://{}.api.mailchimp.com/3.0/lists/{}",
661        token.server_prefix, token.mailchimp_mailing_list_id
662    );
663
664    // Check if batch is empty before sending
665    if users_data_in_json.is_empty() {
666        info!("No new users to sync.");
667        return Ok(vec![]);
668    }
669
670    // Send the batch request to Mailchimp
671    let response = REQWEST_CLIENT
672        .post(&url)
673        .header("Content-Type", "application/json")
674        .header("Authorization", format!("apikey {}", token.access_token))
675        .json(&batch_request)
676        .send()
677        .await?;
678
679    if response.status().is_success() {
680        let response_data: serde_json::Value = response.json().await?;
681
682        // Iterate over both new_members and updated_members to extract user_ids and contact_ids
683        for key in &["new_members", "updated_members"] {
684            if let Some(members) = response_data[key].as_array() {
685                for member in members {
686                    if let Some(user_id) = member["merge_fields"]["USERID"].as_str() {
687                        if let Ok(uuid) = uuid::Uuid::parse_str(user_id) {
688                            successfully_synced_user_ids.push(uuid);
689                        }
690                        if let Some(contact_id) = member["contact_id"].as_str() {
691                            user_id_contact_id_pairs
692                                .push((user_id.to_string(), contact_id.to_string()));
693                        }
694                    }
695                }
696            }
697        }
698        // Update the users contact_id from Mailchimp to the database as user_mailchimp_id
699        headless_lms_models::marketing_consents::update_user_mailchimp_id_at_to_all_synced_users(
700            conn,
701            user_id_contact_id_pairs,
702        )
703        .await?;
704
705        // Return the list of successfully synced user_ids
706        Ok(successfully_synced_user_ids)
707    } else {
708        let status = response.status();
709        let error_text = response.text().await?;
710        Err(anyhow::anyhow!(
711            "Error syncing users to Mailchimp. Status: {}. Error: {}",
712            status,
713            error_text
714        ))
715    }
716}
717
718/// Updates the email addresses of multiple users in a Mailchimp mailing list.
719async fn update_emails_in_mailchimp(
720    users: Vec<UserEmailSubscription>,
721    list_id: &str,
722    server_prefix: &str,
723    access_token: &str,
724) -> anyhow::Result<Vec<Uuid>> {
725    let mut successfully_synced_user_ids = Vec::new();
726    let mut failed_user_ids = Vec::new();
727
728    for user in users {
729        if let Some(ref user_mailchimp_id) = user.user_mailchimp_id {
730            if let Some(ref status) = user.email_subscription_in_mailchimp {
731                if status != "subscribed" {
732                    continue; // Skip this user if they are not subscribed because Mailchimp only updates emails that are subscribed
733                }
734            }
735
736            let url = format!(
737                "https://{}.api.mailchimp.com/3.0/lists/{}/members/{}",
738                server_prefix, list_id, user_mailchimp_id
739            );
740
741            // Prepare the body for the PUT request
742            let body = serde_json::json!({
743                "email_address": &user.email,
744                "status": &user.email_subscription_in_mailchimp,
745            });
746
747            // Update the email
748            let update_response = REQWEST_CLIENT
749                .put(&url)
750                .header("Authorization", format!("apikey {}", access_token))
751                .json(&body)
752                .send()
753                .await?;
754
755            if update_response.status().is_success() {
756                successfully_synced_user_ids.push(user.user_id);
757            } else {
758                failed_user_ids.push(user.user_id);
759            }
760        } else {
761            continue;
762        }
763    }
764
765    if !failed_user_ids.is_empty() {
766        info!("Failed to update the following users:");
767        for user_id in &failed_user_ids {
768            error!("User ID: {}", user_id);
769        }
770    }
771
772    Ok(successfully_synced_user_ids)
773}
774
775/// Fetches data from Mailchimp in chunks.
776async fn fetch_unsubscribed_users_from_mailchimp_in_chunks(
777    list_id: &str,
778    server_prefix: &str,
779    access_token: &str,
780    chunk_size: usize,
781) -> anyhow::Result<Vec<(String, String, String, String)>> {
782    let mut all_data = Vec::new();
783    let mut offset = 0;
784
785    loop {
786        let url = format!(
787            "https://{}.api.mailchimp.com/3.0/lists/{}/members?offset={}&count={}&fields=members.merge_fields,members.status,members.last_changed&status=unsubscribed,non-subscribed",
788            server_prefix, list_id, offset, chunk_size
789        );
790
791        let response = REQWEST_CLIENT
792            .get(&url)
793            .header("Authorization", format!("apikey {}", access_token))
794            .send()
795            .await?
796            .json::<serde_json::Value>()
797            .await?;
798
799        let empty_vec = vec![];
800        let members = response["members"].as_array().unwrap_or(&empty_vec);
801        if members.is_empty() {
802            break;
803        }
804
805        for member in members {
806            // Process the member, but only if necessary fields are present and valid
807            if let (Some(status), Some(last_changed), Some(merge_fields)) = (
808                member["status"].as_str(),
809                member["last_changed"].as_str(),
810                member["merge_fields"].as_object(),
811            ) {
812                // Ensure both USERID and LANGGRPID are present and valid
813                if let (Some(user_id), Some(language_group_id)) = (
814                    merge_fields.get("USERID").and_then(|v| v.as_str()),
815                    merge_fields.get("LANGGRPID").and_then(|v| v.as_str()),
816                ) {
817                    // Avoid adding data if any field is missing or empty
818                    if !user_id.is_empty() && !language_group_id.is_empty() {
819                        all_data.push((
820                            user_id.to_string(),
821                            last_changed.to_string(),
822                            language_group_id.to_string(),
823                            status.to_string(),
824                        ));
825                    }
826                }
827            }
828        }
829
830        // Check the pagination info from the response
831        let total_items = response["total_items"].as_u64().unwrap_or(0) as usize;
832        if offset + chunk_size >= total_items {
833            break;
834        }
835
836        offset += chunk_size;
837    }
838
839    Ok(all_data)
840}
841
842const BATCH_SIZE: usize = 1000;
843
844async fn process_unsubscribed_users_from_mailchimp(
845    conn: &mut PgConnection,
846    mailchimp_data: Vec<(String, String, String, String)>,
847) -> anyhow::Result<()> {
848    // Log the total size of the Mailchimp data
849    let total_records = mailchimp_data.len();
850
851    for chunk in mailchimp_data.chunks(BATCH_SIZE) {
852        if chunk.is_empty() {
853            continue;
854        }
855
856        // Attempt to process the current chunk
857        if let Err(e) = headless_lms_models::marketing_consents::update_unsubscribed_users_from_mailchimp_in_bulk(
858            conn,
859            chunk.to_vec(),
860        )
861        .await
862        {
863            error!(
864                "Error while processing chunk {}/{}: ",
865                total_records.div_ceil(BATCH_SIZE),
866                e
867            );
868        }
869    }
870
871    Ok(())
872}