1use crate::setup_tracing;
2use dotenv::dotenv;
3use headless_lms_models::marketing_consents::MarketingMailingListAccessToken;
4use headless_lms_models::marketing_consents::UserEmailSubscription;
5use headless_lms_models::marketing_consents::UserMarketingConsentWithDetails;
6use headless_lms_utils::http::REQWEST_CLIENT;
7use serde::Deserialize;
8use serde_json::json;
9use sqlx::{PgConnection, PgPool};
10use std::{
11 env,
12 time::{Duration, Instant},
13};
14use uuid::Uuid;
15
16#[derive(Debug, Deserialize)]
17struct MailchimpField {
18 field_id: String,
19 field_name: String,
20}
21
22#[derive(Debug)]
23struct FieldSchema {
24 tag: &'static str,
25 name: &'static str,
26 default_value: &'static str,
27}
28
29const REQUIRED_FIELDS: &[FieldSchema] = &[
30 FieldSchema {
31 tag: "FNAME",
32 name: "First Name",
33 default_value: "",
34 },
35 FieldSchema {
36 tag: "LNAME",
37 name: "Last Name",
38 default_value: "",
39 },
40 FieldSchema {
41 tag: "MARKETING",
42 name: "Accepts Marketing",
43 default_value: "disallowed",
44 },
45 FieldSchema {
46 tag: "LOCALE",
47 name: "Locale",
48 default_value: "en",
49 },
50 FieldSchema {
51 tag: "GRADUATED",
52 name: "Graduated",
53 default_value: "",
54 },
55 FieldSchema {
56 tag: "COURSEID",
57 name: "Course ID",
58 default_value: "",
59 },
60 FieldSchema {
61 tag: "LANGGRPID",
62 name: "Course language Group ID",
63 default_value: "",
64 },
65 FieldSchema {
66 tag: "USERID",
67 name: "User ID",
68 default_value: "",
69 },
70 FieldSchema {
71 tag: "RESEARCH",
72 name: "Research consent",
73 default_value: "false",
74 },
75];
76
77const FIELDS_EXCLUDED_FROM_REMOVING: &[&str] = &["PHONE", "PACE", "COUNTRY", "MMERGE9"];
79const REMOVE_UNSUPPORTED_FIELDS: bool = false;
80const PROCESS_UNSUBSCRIBES_INTERVAL_SECS: u64 = 10_800;
81
82const SYNC_INTERVAL_SECS: u64 = 10;
83const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
84
85pub async fn main() -> anyhow::Result<()> {
87 initialize_environment()?;
88
89 let config = initialize_configuration().await?;
90
91 let db_pool = initialize_database_pool(&config.database_url).await?;
92 let mut conn = db_pool.acquire().await?;
93
94 let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
95 let mut ticks = 0;
96
97 let access_tokens =
98 headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
99 &mut conn,
100 )
101 .await?;
102
103 for token in &access_tokens {
105 if let Err(e) = ensure_mailchimp_schema(
106 &token.mailchimp_mailing_list_id,
107 &token.server_prefix,
108 &token.access_token,
109 )
110 .await
111 {
112 error!(
113 "Failed to set up Mailchimp schema for list '{}': {:?}",
114 token.mailchimp_mailing_list_id, e
115 );
116 return Err(e);
117 }
118 }
119
120 info!("Starting mailchimp syncer.");
121
122 let mut last_time_unsubscribes_processed = Instant::now();
123 let mut last_time_tags_synced = Instant::now();
124
125 loop {
126 interval.tick().await;
127 ticks += 1;
128
129 if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
130 ticks = 0;
131 info!("Still syncing.");
132 }
133 let mut process_unsubscribes = false;
134 if last_time_unsubscribes_processed.elapsed().as_secs()
135 >= PROCESS_UNSUBSCRIBES_INTERVAL_SECS
136 {
137 process_unsubscribes = true;
138 last_time_unsubscribes_processed = Instant::now();
139 };
140
141 if last_time_tags_synced.elapsed().as_secs() >= 3600 {
143 info!("Syncing tags from Mailchimp...");
144 for token in &access_tokens {
145 if let Err(e) = sync_tags_from_mailchimp(
146 &mut conn,
147 &token.mailchimp_mailing_list_id,
148 &token.access_token,
149 &token.server_prefix,
150 token.id,
151 token.course_language_group_id,
152 )
153 .await
154 {
155 error!(
156 "Failed to sync tags for list '{}': {:?}",
157 token.mailchimp_mailing_list_id, e
158 );
159 }
160 }
161 last_time_tags_synced = Instant::now();
162 }
163
164 if let Err(e) = sync_contacts(&mut conn, &config, process_unsubscribes).await {
165 error!("Error during synchronization: {:?}", e);
166 if let Ok(sqlx::Error::Io(..)) = e.downcast::<sqlx::Error>() {
167 info!("syncer may have lost its connection to the db, trying to reconnect");
169 conn = db_pool.acquire().await?;
170 }
171 }
172 }
173}
174
175fn initialize_environment() -> anyhow::Result<()> {
177 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
179 dotenv().ok();
180 setup_tracing()?;
181 Ok(())
182}
183
184struct SyncerConfig {
186 database_url: String,
187}
188
189async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
191 let database_url = env::var("DATABASE_URL")
192 .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
193
194 Ok(SyncerConfig { database_url })
195}
196
197async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
199 PgPool::connect(database_url).await.map_err(|e| {
200 anyhow::anyhow!(
201 "Failed to connect to the database at {}: {:?}",
202 database_url,
203 e
204 )
205 })
206}
207
208async fn ensure_mailchimp_schema(
210 list_id: &str,
211 server_prefix: &str,
212 access_token: &str,
213) -> anyhow::Result<()> {
214 let existing_fields =
215 fetch_current_mailchimp_fields(list_id, server_prefix, access_token).await?;
216
217 if REMOVE_UNSUPPORTED_FIELDS {
218 for field in existing_fields.iter() {
220 if !REQUIRED_FIELDS
221 .iter()
222 .any(|r| r.tag == field.field_name.as_str())
223 && !FIELDS_EXCLUDED_FROM_REMOVING.contains(&field.field_name.as_str())
224 {
225 match remove_field_from_mailchimp(
226 list_id,
227 &field.field_id,
228 server_prefix,
229 access_token,
230 )
231 .await
232 {
233 Err(e) => {
234 warn!("Could not remove field '{}': {}", field.field_name, e);
235 }
236 _ => {
237 info!("Removed field '{}'", field.field_name);
238 }
239 }
240 }
241 }
242 }
243
244 for required_field in REQUIRED_FIELDS.iter() {
246 if !existing_fields
247 .iter()
248 .any(|f| f.field_name == required_field.tag)
249 {
250 match add_field_to_mailchimp(list_id, required_field, server_prefix, access_token).await
251 {
252 Err(e) => {
253 warn!(
254 "Failed to add required field '{}': {}",
255 required_field.name, e
256 );
257 }
258 _ => {
259 info!(
260 "Successfully added required field '{}'",
261 required_field.name
262 );
263 }
264 }
265 } else {
266 info!(
267 "Field '{}' already exists, skipping addition.",
268 required_field.name
269 );
270 }
271 }
272
273 Ok(())
274}
275
276async fn fetch_current_mailchimp_fields(
278 list_id: &str,
279 server_prefix: &str,
280 access_token: &str,
281) -> Result<Vec<MailchimpField>, anyhow::Error> {
282 let url = format!(
283 "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
284 server_prefix, list_id
285 );
286
287 let response = REQWEST_CLIENT
288 .get(&url)
289 .header("Authorization", format!("apikey {}", access_token))
290 .send()
291 .await?;
292
293 if response.status().is_success() {
294 let json = response.json::<serde_json::Value>().await?;
295
296 let fields: Vec<MailchimpField> = json["merge_fields"]
297 .as_array()
298 .unwrap_or(&vec![])
299 .iter()
300 .filter_map(|field| {
301 let field_id = field["merge_id"].as_u64();
302 let field_name = field["tag"].as_str();
303
304 if let (Some(field_id), Some(field_name)) = (field_id, field_name) {
305 Some(MailchimpField {
306 field_id: field_id.to_string(),
307 field_name: field_name.to_string(),
308 })
309 } else {
310 None
311 }
312 })
313 .collect();
314
315 Ok(fields)
316 } else {
317 let error_text = response.text().await?;
318 error!("Error fetching merge fields: {}", error_text);
319 Err(anyhow::anyhow!("Failed to fetch current Mailchimp fields."))
320 }
321}
322
323async fn add_field_to_mailchimp(
325 list_id: &str,
326 field_schema: &FieldSchema,
327 server_prefix: &str,
328 access_token: &str,
329) -> anyhow::Result<()> {
330 let url = format!(
331 "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields",
332 server_prefix, list_id
333 );
334
335 let body = json!({
336 "tag": field_schema.tag,
337 "name": field_schema.name,
338 "type": "text",
339 "default_value": field_schema.default_value,
340 });
341
342 let response = REQWEST_CLIENT
343 .post(&url)
344 .header("Authorization", format!("apikey {}", access_token))
345 .json(&body)
346 .send()
347 .await?;
348
349 if response.status().is_success() {
350 Ok(())
351 } else {
352 let status = response.status();
353 let error_text = response
354 .text()
355 .await
356 .unwrap_or_else(|_| "No additional error info.".to_string());
357 Err(anyhow::anyhow!(
358 "Failed to add field to Mailchimp. Status: {}. Error: {}",
359 status,
360 error_text
361 ))
362 }
363}
364
365async fn remove_field_from_mailchimp(
367 list_id: &str,
368 field_id: &str,
369 server_prefix: &str,
370 access_token: &str,
371) -> anyhow::Result<()> {
372 let url = format!(
373 "https://{}.api.mailchimp.com/3.0/lists/{}/merge-fields/{}",
374 server_prefix, list_id, field_id
375 );
376
377 let response = REQWEST_CLIENT
378 .delete(&url)
379 .header("Authorization", format!("apikey {}", access_token))
380 .send()
381 .await?;
382
383 if response.status().is_success() {
384 Ok(())
385 } else {
386 let status = response.status();
387 let error_text = response
388 .text()
389 .await
390 .unwrap_or_else(|_| "No additional error info.".to_string());
391 Err(anyhow::anyhow!(
392 "Failed to remove field from Mailchimp. Status: {}. Error: {}",
393 status,
394 error_text
395 ))
396 }
397}
398
399pub async fn sync_tags_from_mailchimp(
401 conn: &mut PgConnection,
402 list_id: &str,
403 access_token: &str,
404 server_prefix: &str,
405 marketing_mailing_list_access_token_id: Uuid,
406 course_language_group_id: Uuid,
407) -> anyhow::Result<()> {
408 let url = format!(
409 "https://{}.api.mailchimp.com/3.0/lists/{}/tag-search",
410 server_prefix, list_id
411 );
412
413 let response = REQWEST_CLIENT
414 .get(&url)
415 .header("Authorization", format!("apikey {}", access_token))
416 .send()
417 .await?;
418
419 let response_json = response.json::<serde_json::Value>().await?;
421
422 let mailchimp_tags = match response_json.get("tags") {
423 Some(tags) if tags.is_array() => tags
424 .as_array()
425 .unwrap()
426 .iter()
427 .filter_map(|tag| {
428 let name = tag.get("name")?.as_str()?.to_string();
429
430 let id = match tag.get("id") {
431 Some(serde_json::Value::Number(num)) => num.to_string(),
432 Some(serde_json::Value::String(str_id)) => str_id.clone(),
433 _ => return None,
434 };
435
436 Some((id, name))
437 })
438 .collect::<Vec<(String, String)>>(),
439 _ => {
440 warn!("No tags found for list '{}', skipping sync.", list_id);
441 return Ok(());
442 }
443 };
444
445 let db_tags = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(
447 conn,
448 course_language_group_id,
449 marketing_mailing_list_access_token_id,
450 )
451 .await?;
452
453 for (tag_id, tag_name) in &mailchimp_tags {
455 if let Some(db_tag) = db_tags.iter().find(|db_tag| {
456 db_tag.get("id").and_then(|v| v.as_str()).map(|v| v.trim()) == Some(tag_id.trim())
457 }) {
458 let db_tag_name = db_tag
459 .get("tag_name")
460 .and_then(|v| v.as_str())
461 .unwrap_or_default();
462 if db_tag_name != tag_name {
463 headless_lms_models::marketing_consents::upsert_tag(
464 conn,
465 course_language_group_id,
466 marketing_mailing_list_access_token_id,
467 tag_id.clone(),
468 tag_name.clone(),
469 )
470 .await?;
471 }
472 }
473 }
474
475 for db_tag in db_tags.iter() {
477 let db_tag_id = db_tag
478 .get("id")
479 .and_then(|v| v.as_str())
480 .map(|v| v.trim().to_string())
481 .unwrap_or_default();
482
483 if !mailchimp_tags
485 .iter()
486 .any(|(tag_id, _)| tag_id.trim() == db_tag_id.trim())
487 {
488 if db_tag_id.is_empty() {
489 warn!("Skipping tag deletion due to missing ID: {:?}", db_tag);
490 continue;
491 }
492 headless_lms_models::marketing_consents::delete_tag(
493 conn,
494 db_tag_id.clone(),
495 course_language_group_id,
496 )
497 .await?;
498 }
499 }
500
501 Ok(())
502}
503
504async fn sync_contacts(
507 conn: &mut PgConnection,
508 _config: &SyncerConfig,
509 process_unsubscribes: bool,
510) -> anyhow::Result<()> {
511 let access_tokens =
512 headless_lms_models::marketing_consents::fetch_all_marketing_mailing_list_access_tokens(
513 conn,
514 )
515 .await?;
516
517 let mut successfully_synced_user_ids = Vec::new();
518
519 for token in access_tokens {
521 if process_unsubscribes {
523 let mailchimp_data = fetch_unsubscribed_users_from_mailchimp_in_chunks(
524 &token.mailchimp_mailing_list_id,
525 &token.server_prefix,
526 &token.access_token,
527 1000,
528 )
529 .await?;
530
531 info!(
532 "Processing Mailchimp data for list: {}",
533 token.mailchimp_mailing_list_id
534 );
535
536 process_unsubscribed_users_from_mailchimp(conn, mailchimp_data).await?;
537 }
538
539 let users_with_unsynced_emails =
541 headless_lms_models::marketing_consents::fetch_all_unsynced_updated_emails(
542 conn,
543 token.course_language_group_id,
544 )
545 .await?;
546
547 info!(
548 "Found {} unsynced user email(s) for course language group: {}",
549 users_with_unsynced_emails.len(),
550 token.course_language_group_id
551 );
552
553 if !users_with_unsynced_emails.is_empty() {
554 let email_synced_user_ids = update_emails_in_mailchimp(
555 users_with_unsynced_emails,
556 &token.mailchimp_mailing_list_id,
557 &token.server_prefix,
558 &token.access_token,
559 )
560 .await?;
561
562 successfully_synced_user_ids.extend(email_synced_user_ids);
564 }
565
566 let tag_objects = headless_lms_models::marketing_consents::fetch_tags_with_course_language_group_id_and_marketing_mailing_list_access_token_id(conn, token.course_language_group_id, token.id).await?;
567
568 let unsynced_users_details =
570 headless_lms_models::marketing_consents::fetch_all_unsynced_user_marketing_consents_by_course_language_group_id(
571 conn,
572 token.course_language_group_id,
573 )
574 .await?;
575
576 info!(
577 "Found {} unsynced user consent(s) for course language group: {}",
578 unsynced_users_details.len(),
579 token.course_language_group_id
580 );
581
582 if !unsynced_users_details.is_empty() {
583 let consent_synced_user_ids =
584 send_users_to_mailchimp(conn, token, unsynced_users_details, tag_objects).await?;
585
586 successfully_synced_user_ids.extend(consent_synced_user_ids);
588 }
589 }
590
591 if !successfully_synced_user_ids.is_empty() {
593 match headless_lms_models::marketing_consents::update_synced_to_mailchimp_at_to_all_synced_users(
594 conn,
595 &successfully_synced_user_ids,
596 )
597 .await
598 {
599 Ok(_) => {
600 info!(
601 "Successfully updated synced status for {} users.",
602 successfully_synced_user_ids.len()
603 );
604 }
605 Err(e) => {
606 error!(
607 "Failed to update synced status for {} users: {:?}",
608 successfully_synced_user_ids.len(),
609 e
610 );
611 }
612 }
613 }
614
615 Ok(())
616}
617
618pub async fn send_users_to_mailchimp(
620 conn: &mut PgConnection,
621 token: MarketingMailingListAccessToken,
622 users_details: Vec<UserMarketingConsentWithDetails>,
623 tag_objects: Vec<serde_json::Value>,
624) -> anyhow::Result<Vec<Uuid>> {
625 let mut users_data_in_json = vec![];
626 let mut user_ids = vec![];
627 let mut successfully_synced_user_ids = Vec::new();
628 let mut user_id_contact_id_pairs = Vec::new();
629
630 for user in &users_details {
632 if let Some(ref subscription) = user.email_subscription_in_mailchimp {
634 if subscription == "subscribed" {
635 let user_details = json!({
636 "email_address": user.email,
637 "status": user.email_subscription_in_mailchimp,
638 "merge_fields": {
639 "FNAME": user.first_name.clone().unwrap_or("".to_string()),
640 "LNAME": user.last_name.clone().unwrap_or("".to_string()),
641 "MARKETING": if user.consent { "allowed" } else { "disallowed" },
642 "LOCALE": user.locale,
643 "GRADUATED": user.completed_course_at.map(|cca| cca.to_rfc3339()).unwrap_or("".to_string()),
645 "USERID": user.user_id,
646 "COURSEID": user.course_id,
647 "LANGGRPID": user.course_language_group_id,
648 "RESEARCH" : if user.research_consent.unwrap_or(false) { "allowed" } else { "disallowed" },
649 "COUNTRY" : user.country.clone().unwrap_or("".to_string()),
650 },
651 "tags": tag_objects.iter().map(|tag| tag["name"].clone()).collect::<Vec<_>>()
652 });
653 users_data_in_json.push(user_details);
654 user_ids.push(user.id);
655 }
656 }
657 }
658
659 let batch_request = json!({
660 "members": users_data_in_json,
661 "update_existing":true
662 });
663
664 let url = format!(
665 "https://{}.api.mailchimp.com/3.0/lists/{}",
666 token.server_prefix, token.mailchimp_mailing_list_id
667 );
668
669 if users_data_in_json.is_empty() {
671 info!("No new users to sync.");
672 return Ok(vec![]);
673 }
674
675 let response = REQWEST_CLIENT
677 .post(&url)
678 .header("Content-Type", "application/json")
679 .header("Authorization", format!("apikey {}", token.access_token))
680 .json(&batch_request)
681 .send()
682 .await?;
683
684 if response.status().is_success() {
685 let response_data: serde_json::Value = response.json().await?;
686
687 for key in &["new_members", "updated_members"] {
689 if let Some(members) = response_data[key].as_array() {
690 for member in members {
691 if let Some(user_id) = member["merge_fields"]["USERID"].as_str() {
692 if let Ok(uuid) = uuid::Uuid::parse_str(user_id) {
693 successfully_synced_user_ids.push(uuid);
694 }
695 if let Some(contact_id) = member["contact_id"].as_str() {
696 user_id_contact_id_pairs
697 .push((user_id.to_string(), contact_id.to_string()));
698 }
699 }
700 }
701 }
702 }
703 headless_lms_models::marketing_consents::update_user_mailchimp_id_at_to_all_synced_users(
705 conn,
706 user_id_contact_id_pairs,
707 )
708 .await?;
709
710 Ok(successfully_synced_user_ids)
712 } else {
713 let status = response.status();
714 let error_text = response.text().await?;
715 Err(anyhow::anyhow!(
716 "Error syncing users to Mailchimp. Status: {}. Error: {}",
717 status,
718 error_text
719 ))
720 }
721}
722
723async fn update_emails_in_mailchimp(
725 users: Vec<UserEmailSubscription>,
726 list_id: &str,
727 server_prefix: &str,
728 access_token: &str,
729) -> anyhow::Result<Vec<Uuid>> {
730 let mut successfully_synced_user_ids = Vec::new();
731 let mut failed_user_ids = Vec::new();
732
733 for user in users {
734 if let Some(ref user_mailchimp_id) = user.user_mailchimp_id {
735 if let Some(ref status) = user.email_subscription_in_mailchimp {
736 if status != "subscribed" {
737 continue; }
739 }
740
741 let url = format!(
742 "https://{}.api.mailchimp.com/3.0/lists/{}/members/{}",
743 server_prefix, list_id, user_mailchimp_id
744 );
745
746 let body = serde_json::json!({
748 "email_address": &user.email,
749 "status": &user.email_subscription_in_mailchimp,
750 });
751
752 let update_response = REQWEST_CLIENT
754 .put(&url)
755 .header("Authorization", format!("apikey {}", access_token))
756 .json(&body)
757 .send()
758 .await?;
759
760 if update_response.status().is_success() {
761 successfully_synced_user_ids.push(user.user_id);
762 } else {
763 failed_user_ids.push(user.user_id);
764 }
765 } else {
766 continue;
767 }
768 }
769
770 if !failed_user_ids.is_empty() {
771 info!("Failed to update the following users:");
772 for user_id in &failed_user_ids {
773 error!("User ID: {}", user_id);
774 }
775 }
776
777 Ok(successfully_synced_user_ids)
778}
779
780async fn fetch_unsubscribed_users_from_mailchimp_in_chunks(
782 list_id: &str,
783 server_prefix: &str,
784 access_token: &str,
785 chunk_size: usize,
786) -> anyhow::Result<Vec<(String, String, String, String)>> {
787 let mut all_data = Vec::new();
788 let mut offset = 0;
789
790 loop {
791 let url = format!(
792 "https://{}.api.mailchimp.com/3.0/lists/{}/members?offset={}&count={}&fields=members.merge_fields,members.status,members.last_changed&status=unsubscribed,non-subscribed",
793 server_prefix, list_id, offset, chunk_size
794 );
795
796 let response = REQWEST_CLIENT
797 .get(&url)
798 .header("Authorization", format!("apikey {}", access_token))
799 .send()
800 .await?
801 .json::<serde_json::Value>()
802 .await?;
803
804 let empty_vec = vec![];
805 let members = response["members"].as_array().unwrap_or(&empty_vec);
806 if members.is_empty() {
807 break;
808 }
809
810 for member in members {
811 if let (Some(status), Some(last_changed), Some(merge_fields)) = (
813 member["status"].as_str(),
814 member["last_changed"].as_str(),
815 member["merge_fields"].as_object(),
816 ) {
817 if let (Some(user_id), Some(language_group_id)) = (
819 merge_fields.get("USERID").and_then(|v| v.as_str()),
820 merge_fields.get("LANGGRPID").and_then(|v| v.as_str()),
821 ) {
822 if !user_id.is_empty() && !language_group_id.is_empty() {
824 all_data.push((
825 user_id.to_string(),
826 last_changed.to_string(),
827 language_group_id.to_string(),
828 status.to_string(),
829 ));
830 }
831 }
832 }
833 }
834
835 let total_items = response["total_items"].as_u64().unwrap_or(0) as usize;
837 if offset + chunk_size >= total_items {
838 break;
839 }
840
841 offset += chunk_size;
842 }
843
844 Ok(all_data)
845}
846
847const BATCH_SIZE: usize = 1000;
848
849async fn process_unsubscribed_users_from_mailchimp(
850 conn: &mut PgConnection,
851 mailchimp_data: Vec<(String, String, String, String)>,
852) -> anyhow::Result<()> {
853 let total_records = mailchimp_data.len();
855
856 for chunk in mailchimp_data.chunks(BATCH_SIZE) {
857 if chunk.is_empty() {
858 continue;
859 }
860
861 if let Err(e) = headless_lms_models::marketing_consents::update_unsubscribed_users_from_mailchimp_in_bulk(
863 conn,
864 chunk.to_vec(),
865 )
866 .await
867 {
868 error!(
869 "Error while processing chunk {}/{}: ",
870 total_records.div_ceil(BATCH_SIZE),
871 e
872 );
873 }
874 }
875
876 Ok(())
877}