1use crate::setup_tracing;
2use dotenv::dotenv;
3use headless_lms_models::marketing_consents::MarketingMailingListAccessToken;
4use headless_lms_models::marketing_consents::UserEmailSubscription;
5use headless_lms_models::marketing_consents::UserMarketingConsentWithDetails;
6use headless_lms_utils::http::REQWEST_CLIENT;
7use serde::Deserialize;
8use serde_json::json;
9use sqlx::{PgConnection, PgPool};
10use std::{
11 env,
12 time::{Duration, Instant},
13};
14use uuid::Uuid;
15
16#[derive(Debug, Deserialize)]
17struct MailchimpField {
18 field_id: String,
19 field_name: String,
20}
21
22#[derive(Debug)]
23struct FieldSchema {
24 tag: &'static str,
25 name: &'static str,
26 default_value: &'static str,
27}
28
29const REQUIRED_FIELDS: &[FieldSchema] = &[
30 FieldSchema {
31 tag: "FNAME",
32 name: "First Name",
33 default_value: "",
34 },
35 FieldSchema {
36 tag: "LNAME",
37 name: "Last Name",
38 default_value: "",
39 },
40 FieldSchema {
41 tag: "MARKETING",
42 name: "Accepts Marketing",
43 default_value: "disallowed",
44 },
45 FieldSchema {
46 tag: "LOCALE",
47 name: "Locale",
48 default_value: "en",
49 },
50 FieldSchema {
51 tag: "GRADUATED",
52 name: "Graduated",
53 default_value: "",
54 },
55 FieldSchema {
56 tag: "COURSEID",
57 name: "Course ID",
58 default_value: "",
59 },
60 FieldSchema {
61 tag: "LANGGRPID",
62 name: "Course language Group ID",
63 default_value: "",
64 },
65 FieldSchema {
66 tag: "USERID",
67 name: "User ID",
68 default_value: "",
69 },
70 FieldSchema {
71 tag: "RESEARCH",
72 name: "Research consent",
73 default_value: "false",
74 },
75];
76
77const FIELDS_EXCLUDED_FROM_REMOVING: &[&str] = &["PHONE", "PACE", "COUNTRY", "MMERGE9"];
79const REMOVE_UNSUPPORTED_FIELDS: bool = false;
80const PROCESS_UNSUBSCRIBES_INTERVAL_SECS: u64 = 10_800;
81
82const SYNC_INTERVAL_SECS: u64 = 10;
83const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
84
85pub async fn main() -> anyhow::Result<()> {
87 initialize_environment()?;
88
89 let config = initialize_configuration().await?;
90
91 let db_pool = initialize_database_pool(&config.database_url).await?;
92 let mut conn = db_pool.acquire().await?;
93
94 let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
95 let mut ticks = 0;
96
97 let access_tokens =
98 headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
99 &mut conn,
100 )
101 .await?;
102
103 for token in &access_tokens {
105 if let Err(e) = ensure_mailchimp_schema(
106 &token.mailchimp_mailing_list_id,
107 &token.server_prefix,
108 &token.access_token,
109 )
110 .await
111 {
112 error!(
113 "Failed to set up Mailchimp schema for list '{}': {:?}",
114 token.mailchimp_mailing_list_id, e
115 );
116 return Err(e);
117 }
118 }
119
120 info!("Starting mailchimp syncer.");
121
122 let mut last_time_unsubscribes_processed = Instant::now();
123 let mut last_time_tags_synced = Instant::now();
124
125 loop {
126 interval.tick().await;
127 ticks += 1;
128
129 if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
130 ticks = 0;
131 info!("Still syncing.");
132 }
133 let mut process_unsubscribes = false;
134 if last_time_unsubscribes_processed.elapsed().as_secs()
135 >= PROCESS_UNSUBSCRIBES_INTERVAL_SECS
136 {
137 process_unsubscribes = true;
138 last_time_unsubscribes_processed = Instant::now();
139 };
140
141 if last_time_tags_synced.elapsed().as_secs() >= 3600 {
143 info!("Syncing tags from Mailchimp...");
144 for token in &access_tokens {
145 if let Err(e) = sync_tags_from_mailchimp(
146 &mut conn,
147 &token.mailchimp_mailing_list_id,
148 &token.access_token,
149 &token.server_prefix,
150 token.id,
151 token.course_language_group_id,
152 )
153 .await
154 {
155 error!(
156 "Failed to sync tags for list '{}': {:?}",
157 token.mailchimp_mailing_list_id, e
158 );
159 }
160 }
161 last_time_tags_synced = Instant::now();
162 }
163
164 if let Err(e) = sync_contacts(&mut conn, &config, process_unsubscribes).await {
165 error!("Error during synchronization: {:?}", e);
166 }
167 }
168}
169
170fn initialize_environment() -> anyhow::Result<()> {
172 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
174 dotenv().ok();
175 setup_tracing()?;
176 Ok(())
177}
178
179struct SyncerConfig {
181 database_url: String,
182}
183
184async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
186 let database_url = env::var("DATABASE_URL")
187 .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
188
189 Ok(SyncerConfig { database_url })
190}
191
192async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
194 PgPool::connect(database_url).await.map_err(|e| {
195 anyhow::anyhow!(
196 "Failed to connect to the database at {}: {:?}",
197 database_url,
198 e
199 )
200 })
201}
202
203async fn ensure_mailchimp_schema(
205 list_id: &str,
206 server_prefix: &str,
207 access_token: &str,
208) -> anyhow::Result<()> {
209 let existing_fields =
210 fetch_current_mailchimp_fields(list_id, server_prefix, access_token).await?;
211
212 if REMOVE_UNSUPPORTED_FIELDS {
213 for field in existing_fields.iter() {
215 if !REQUIRED_FIELDS
216 .iter()
217 .any(|r| r.tag == field.field_name.as_str())
218 && !FIELDS_EXCLUDED_FROM_REMOVING.contains(&field.field_name.as_str())
219 {
220 match remove_field_from_mailchimp(
221 list_id,
222 &field.field_id,
223 server_prefix,
224 access_token,
225 )
226 .await
227 {
228 Err(e) => {
229 warn!("Could not remove field '{}': {}", field.field_name, e);
230 }
231 _ => {
232 info!("Removed field '{}'", field.field_name);
233 }
234 }
235 }
236 }
237 }
238
239 for required_field in REQUIRED_FIELDS.iter() {
241 if !existing_fields
242 .iter()
243 .any(|f| f.field_name == required_field.tag)
244 {
245 match add_field_to_mailchimp(list_id, required_field, server_prefix, access_token).await
246 {
247 Err(e) => {
248 warn!(
249 "Failed to add required field '{}': {}",
250 required_field.name, e
251 );
252 }
253 _ => {
254 info!(
255 "Successfully added required field '{}'",
256 required_field.name
257 );
258 }
259 }
260 } else {
261 info!(
262 "Field '{}' already exists, skipping addition.",
263 required_field.name
264 );
265 }
266 }
267
268 Ok(())
269}
270
271async fn fetch_current_mailchimp_fields(
273 list_id: &str,
274 server_prefix: &str,
275 access_token: &str,
276) -> Result<Vec<MailchimpField>, anyhow::Error> {
277 let url = format!(
278 "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
279 server_prefix, list_id
280 );
281
282 let response = REQWEST_CLIENT
283 .get(&url)
284 .header("Authorization", format!("apikey {}", access_token))
285 .send()
286 .await?;
287
288 if response.status().is_success() {
289 let json = response.json::<serde_json::Value>().await?;
290
291 let fields: Vec<MailchimpField> = json["merge_fields"]
292 .as_array()
293 .unwrap_or(&vec![])
294 .iter()
295 .filter_map(|field| {
296 let field_id = field["merge_id"].as_u64();
297 let field_name = field["tag"].as_str();
298
299 if let (Some(field_id), Some(field_name)) = (field_id, field_name) {
300 Some(MailchimpField {
301 field_id: field_id.to_string(),
302 field_name: field_name.to_string(),
303 })
304 } else {
305 None
306 }
307 })
308 .collect();
309
310 Ok(fields)
311 } else {
312 let error_text = response.text().await?;
313 error!("Error fetching merge fields: {}", error_text);
314 Err(anyhow::anyhow!("Failed to fetch current Mailchimp fields."))
315 }
316}
317
318async fn add_field_to_mailchimp(
320 list_id: &str,
321 field_schema: &FieldSchema,
322 server_prefix: &str,
323 access_token: &str,
324) -> anyhow::Result<()> {
325 let url = format!(
326 "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
327 server_prefix, list_id
328 );
329
330 let body = json!({
331 "tag": field_schema.tag,
332 "name": field_schema.name,
333 "type": "text",
334 "default_value": field_schema.default_value,
335 });
336
337 let response = REQWEST_CLIENT
338 .post(&url)
339 .header("Authorization", format!("apikey {}", access_token))
340 .json(&body)
341 .send()
342 .await?;
343
344 if response.status().is_success() {
345 Ok(())
346 } else {
347 let status = response.status();
348 let error_text = response
349 .text()
350 .await
351 .unwrap_or_else(|_| "No additional error info.".to_string());
352 Err(anyhow::anyhow!(
353 "Failed to add field to Mailchimp. Status: {}. Error: {}",
354 status,
355 error_text
356 ))
357 }
358}
359
360async fn remove_field_from_mailchimp(
362 list_id: &str,
363 field_id: &str,
364 server_prefix: &str,
365 access_token: &str,
366) -> anyhow::Result<()> {
367 let url = format!(
368 "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields/{}",
369 server_prefix, list_id, field_id
370 );
371
372 let response = REQWEST_CLIENT
373 .delete(&url)
374 .header("Authorization", format!("apikey {}", access_token))
375 .send()
376 .await?;
377
378 if response.status().is_success() {
379 Ok(())
380 } else {
381 let status = response.status();
382 let error_text = response
383 .text()
384 .await
385 .unwrap_or_else(|_| "No additional error info.".to_string());
386 Err(anyhow::anyhow!(
387 "Failed to remove field from Mailchimp. Status: {}. Error: {}",
388 status,
389 error_text
390 ))
391 }
392}
393
394pub async fn sync_tags_from_mailchimp(
396 conn: &mut PgConnection,
397 list_id: &str,
398 access_token: &str,
399 server_prefix: &str,
400 marketing_mailing_list_access_token_id: Uuid,
401 course_language_group_id: Uuid,
402) -> anyhow::Result<()> {
403 let url = format!(
404 "https://{}.api.mailchimp.com/3.0/lists/{}/tag-search",
405 server_prefix, list_id
406 );
407
408 let response = REQWEST_CLIENT
409 .get(&url)
410 .header("Authorization", format!("apikey {}", access_token))
411 .send()
412 .await?;
413
414 let response_json = response.json::<serde_json::Value>().await?;
416
417 let mailchimp_tags = match response_json.get("tags") {
418 Some(tags) if tags.is_array() => tags
419 .as_array()
420 .unwrap()
421 .iter()
422 .filter_map(|tag| {
423 let name = tag.get("name")?.as_str()?.to_string();
424
425 let id = match tag.get("id") {
426 Some(serde_json::Value::Number(num)) => num.to_string(),
427 Some(serde_json::Value::String(str_id)) => str_id.clone(),
428 _ => return None,
429 };
430
431 Some((id, name))
432 })
433 .collect::<Vec<(String, String)>>(),
434 _ => {
435 warn!("No tags found for list '{}', skipping sync.", list_id);
436 return Ok(());
437 }
438 };
439
440 let db_tags = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
442 conn,
443 course_language_group_id,
444 marketing_mailing_list_access_token_id,
445 )
446 .await?;
447
448 for (tag_id, tag_name) in &mailchimp_tags {
450 if let Some(db_tag) = db_tags.iter().find(|db_tag| {
451 db_tag.get("id").and_then(|v| v.as_str()).map(|v| v.trim()) == Some(tag_id.trim())
452 }) {
453 let db_tag_name = db_tag
454 .get("tag_name")
455 .and_then(|v| v.as_str())
456 .unwrap_or_default();
457 if db_tag_name != tag_name {
458 headless_lms_models::marketing_consents::upsert_tag(
459 conn,
460 course_language_group_id,
461 marketing_mailing_list_access_token_id,
462 tag_id.clone(),
463 tag_name.clone(),
464 )
465 .await?;
466 }
467 }
468 }
469
470 for db_tag in db_tags.iter() {
472 let db_tag_id = db_tag
473 .get("id")
474 .and_then(|v| v.as_str())
475 .map(|v| v.trim().to_string())
476 .unwrap_or_default();
477
478 if !mailchimp_tags
480 .iter()
481 .any(|(tag_id, _)| tag_id.trim() == db_tag_id.trim())
482 {
483 if db_tag_id.is_empty() {
484 warn!("Skipping tag deletion due to missing ID: {:?}", db_tag);
485 continue;
486 }
487 headless_lms_models::marketing_consents::delete_tag(
488 conn,
489 db_tag_id.clone(),
490 course_language_group_id,
491 )
492 .await?;
493 }
494 }
495
496 Ok(())
497}
498
499async fn sync_contacts(
502 conn: &mut PgConnection,
503 _config: &SyncerConfig,
504 process_unsubscribes: bool,
505) -> anyhow::Result<()> {
506 let access_tokens =
507 headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
508 conn,
509 )
510 .await?;
511
512 let mut successfully_synced_user_ids = Vec::new();
513
514 for token in access_tokens {
516 if process_unsubscribes {
518 let mailchimp_data = fetch_unsubscribed_users_from_mailchimp_in_chunks(
519 &token.mailchimp_mailing_list_id,
520 &token.server_prefix,
521 &token.access_token,
522 1000,
523 )
524 .await?;
525
526 info!(
527 "Processing Mailchimp data for list: {}",
528 token.mailchimp_mailing_list_id
529 );
530
531 process_unsubscribed_users_from_mailchimp(conn, mailchimp_data).await?;
532 }
533
534 let users_with_unsynced_emails =
536 headless_lms_models::marketing_consents::fetch_all_unsynced_updated_emails(
537 conn,
538 token.course_language_group_id,
539 )
540 .await?;
541
542 info!(
543 "Found {} unsynced user email(s) for course language group: {}",
544 users_with_unsynced_emails.len(),
545 token.course_language_group_id
546 );
547
548 if !users_with_unsynced_emails.is_empty() {
549 let email_synced_user_ids = update_emails_in_mailchimp(
550 users_with_unsynced_emails,
551 &token.mailchimp_mailing_list_id,
552 &token.server_prefix,
553 &token.access_token,
554 )
555 .await?;
556
557 successfully_synced_user_ids.extend(email_synced_user_ids);
559 }
560
561 let tag_objects = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(conn, token.course_language_group_id, token.id).await?;
562
563 let unsynced_users_details =
565 headless_lms_models::marketing_consents::fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
566 conn,
567 token.course_language_group_id,
568 )
569 .await?;
570
571 info!(
572 "Found {} unsynced user consent(s) for course language group: {}",
573 unsynced_users_details.len(),
574 token.course_language_group_id
575 );
576
577 if !unsynced_users_details.is_empty() {
578 let consent_synced_user_ids =
579 send_users_to_mailchimp(conn, token, unsynced_users_details, tag_objects).await?;
580
581 successfully_synced_user_ids.extend(consent_synced_user_ids);
583 }
584 }
585
586 if !successfully_synced_user_ids.is_empty() {
588 match headless_lms_models::marketing_consents::update_synced_to_mailchimp_at_to_all_synced_users(
589 conn,
590 &successfully_synced_user_ids,
591 )
592 .await
593 {
594 Ok(_) => {
595 info!(
596 "Successfully updated synced status for {} users.",
597 successfully_synced_user_ids.len()
598 );
599 }
600 Err(e) => {
601 error!(
602 "Failed to update synced status for {} users: {:?}",
603 successfully_synced_user_ids.len(),
604 e
605 );
606 }
607 }
608 }
609
610 Ok(())
611}
612
613pub async fn send_users_to_mailchimp(
615 conn: &mut PgConnection,
616 token: MarketingMailingListAccessToken,
617 users_details: Vec<UserMarketingConsentWithDetails>,
618 tag_objects: Vec<serde_json::Value>,
619) -> anyhow::Result<Vec<Uuid>> {
620 let mut users_data_in_json = vec![];
621 let mut user_ids = vec![];
622 let mut successfully_synced_user_ids = Vec::new();
623 let mut user_id_contact_id_pairs = Vec::new();
624
625 for user in &users_details {
627 if let Some(ref subscription) = user.email_subscription_in_mailchimp {
629 if subscription == "subscribed" {
630 let user_details = json!({
631 "email_address": user.email,
632 "status": user.email_subscription_in_mailchimp,
633 "merge_fields": {
634 "FNAME": user.first_name.clone().unwrap_or("".to_string()),
635 "LNAME": user.last_name.clone().unwrap_or("".to_string()),
636 "MARKETING": if user.consent { "allowed" } else { "disallowed" },
637 "LOCALE": user.locale,
638 "GRADUATED": user.completed_course_at.map(|cca| cca.to_rfc3339()).unwrap_or("".to_string()),
640 "USERID": user.user_id,
641 "COURSEID": user.course_id,
642 "LANGGRPID": user.course_language_group_id,
643 "RESEARCH" : if user.research_consent.unwrap_or(false) { "allowed" } else { "disallowed" },
644 "COUNTRY" : user.country.clone().unwrap_or("".to_string()),
645 },
646 "tags": tag_objects.iter().map(|tag| tag["name"].clone()).collect::<Vec<_>>()
647 });
648 users_data_in_json.push(user_details);
649 user_ids.push(user.id);
650 }
651 }
652 }
653
654 let batch_request = json!({
655 "members": users_data_in_json,
656 "update_existing":true
657 });
658
659 let url = format!(
660 "https://{}.api.mailchimp.com/3.0/lists/{}",
661 token.server_prefix, token.mailchimp_mailing_list_id
662 );
663
664 if users_data_in_json.is_empty() {
666 info!("No new users to sync.");
667 return Ok(vec![]);
668 }
669
670 let response = REQWEST_CLIENT
672 .post(&url)
673 .header("Content-Type", "application/json")
674 .header("Authorization", format!("apikey {}", token.access_token))
675 .json(&batch_request)
676 .send()
677 .await?;
678
679 if response.status().is_success() {
680 let response_data: serde_json::Value = response.json().await?;
681
682 for key in &["new_members", "updated_members"] {
684 if let Some(members) = response_data[key].as_array() {
685 for member in members {
686 if let Some(user_id) = member["merge_fields"]["USERID"].as_str() {
687 if let Ok(uuid) = uuid::Uuid::parse_str(user_id) {
688 successfully_synced_user_ids.push(uuid);
689 }
690 if let Some(contact_id) = member["contact_id"].as_str() {
691 user_id_contact_id_pairs
692 .push((user_id.to_string(), contact_id.to_string()));
693 }
694 }
695 }
696 }
697 }
698 headless_lms_models::marketing_consents::update_user_mailchimp_id_at_to_all_synced_users(
700 conn,
701 user_id_contact_id_pairs,
702 )
703 .await?;
704
705 Ok(successfully_synced_user_ids)
707 } else {
708 let status = response.status();
709 let error_text = response.text().await?;
710 Err(anyhow::anyhow!(
711 "Error syncing users to Mailchimp. Status: {}. Error: {}",
712 status,
713 error_text
714 ))
715 }
716}
717
718async fn update_emails_in_mailchimp(
720 users: Vec<UserEmailSubscription>,
721 list_id: &str,
722 server_prefix: &str,
723 access_token: &str,
724) -> anyhow::Result<Vec<Uuid>> {
725 let mut successfully_synced_user_ids = Vec::new();
726 let mut failed_user_ids = Vec::new();
727
728 for user in users {
729 if let Some(ref user_mailchimp_id) = user.user_mailchimp_id {
730 if let Some(ref status) = user.email_subscription_in_mailchimp {
731 if status != "subscribed" {
732 continue; }
734 }
735
736 let url = format!(
737 "https://{}.api.mailchimp.com/3.0/lists/{}/members/{}",
738 server_prefix, list_id, user_mailchimp_id
739 );
740
741 let body = serde_json::json!({
743 "email_address": &user.email,
744 "status": &user.email_subscription_in_mailchimp,
745 });
746
747 let update_response = REQWEST_CLIENT
749 .put(&url)
750 .header("Authorization", format!("apikey {}", access_token))
751 .json(&body)
752 .send()
753 .await?;
754
755 if update_response.status().is_success() {
756 successfully_synced_user_ids.push(user.user_id);
757 } else {
758 failed_user_ids.push(user.user_id);
759 }
760 } else {
761 continue;
762 }
763 }
764
765 if !failed_user_ids.is_empty() {
766 info!("Failed to update the following users:");
767 for user_id in &failed_user_ids {
768 error!("User ID: {}", user_id);
769 }
770 }
771
772 Ok(successfully_synced_user_ids)
773}
774
775async fn fetch_unsubscribed_users_from_mailchimp_in_chunks(
777 list_id: &str,
778 server_prefix: &str,
779 access_token: &str,
780 chunk_size: usize,
781) -> anyhow::Result<Vec<(String, String, String, String)>> {
782 let mut all_data = Vec::new();
783 let mut offset = 0;
784
785 loop {
786 let url = format!(
787 "https://{}.api.mailchimp.com/3.0/lists/{}/members?offset={}&count={}&fields=members.merge_fields,members.status,members.last_changed&status=unsubscribed,non-subscribed",
788 server_prefix, list_id, offset, chunk_size
789 );
790
791 let response = REQWEST_CLIENT
792 .get(&url)
793 .header("Authorization", format!("apikey {}", access_token))
794 .send()
795 .await?
796 .json::<serde_json::Value>()
797 .await?;
798
799 let empty_vec = vec![];
800 let members = response["members"].as_array().unwrap_or(&empty_vec);
801 if members.is_empty() {
802 break;
803 }
804
805 for member in members {
806 if let (Some(status), Some(last_changed), Some(merge_fields)) = (
808 member["status"].as_str(),
809 member["last_changed"].as_str(),
810 member["merge_fields"].as_object(),
811 ) {
812 if let (Some(user_id), Some(language_group_id)) = (
814 merge_fields.get("USERID").and_then(|v| v.as_str()),
815 merge_fields.get("LANGGRPID").and_then(|v| v.as_str()),
816 ) {
817 if !user_id.is_empty() && !language_group_id.is_empty() {
819 all_data.push((
820 user_id.to_string(),
821 last_changed.to_string(),
822 language_group_id.to_string(),
823 status.to_string(),
824 ));
825 }
826 }
827 }
828 }
829
830 let total_items = response["total_items"].as_u64().unwrap_or(0) as usize;
832 if offset + chunk_size >= total_items {
833 break;
834 }
835
836 offset += chunk_size;
837 }
838
839 Ok(all_data)
840}
841
842const BATCH_SIZE: usize = 1000;
843
844async fn process_unsubscribed_users_from_mailchimp(
845 conn: &mut PgConnection,
846 mailchimp_data: Vec<(String, String, String, String)>,
847) -> anyhow::Result<()> {
848 let total_records = mailchimp_data.len();
850
851 for chunk in mailchimp_data.chunks(BATCH_SIZE) {
852 if chunk.is_empty() {
853 continue;
854 }
855
856 if let Err(e) = headless_lms_models::marketing_consents::update_unsubscribed_users_from_mailchimp_in_bulk(
858 conn,
859 chunk.to_vec(),
860 )
861 .await
862 {
863 error!(
864 "Error while processing chunk {}/{}: ",
865 total_records.div_ceil(BATCH_SIZE),
866 e
867 );
868 }
869 }
870
871 Ok(())
872}