Skip to main content

headless_lms_server/programs/mailchimp_syncer/
mailchimp_ops.rs

1use headless_lms_models::marketing_consents::MarketingMailingListAccessToken;
2use headless_lms_utils::http::REQWEST_CLIENT;
3use reqwest::Method;
4use secrecy::ExposeSecret;
5use serde_json::Value;
6use std::collections::HashMap;
7use std::time::Duration;
8
9use super::batch_client::{
10    BatchOperation, parse_batch_results_from_tar_gz, parse_operation_response_value,
11    poll_batch_until_finished_with_response_url, submit_batch,
12};
13
14const DEFAULT_BATCH_THRESHOLD: usize = 5;
15
16#[derive(Debug, Clone)]
17pub struct MailchimpOperation {
18    pub method: Method,
19    pub path: String,
20    pub body: Option<Value>,
21    pub operation_id: Option<String>,
22}
23
24#[derive(Debug, Clone)]
25pub struct MailchimpOpResult {
26    pub operation_id: Option<String>,
27    pub status_code: u16,
28    pub response_raw: Value,
29    pub response_json: Option<Value>,
30    pub error: Option<String>,
31}
32
33impl MailchimpOpResult {
34    pub fn is_success(&self) -> bool {
35        (200..=299).contains(&self.status_code) && self.error.is_none()
36    }
37}
38
39pub struct MailchimpExecutor {
40    batch_threshold: usize,
41    timeout: Duration,
42    poll_interval: Duration,
43}
44
45impl MailchimpExecutor {
46    /// Executes Mailchimp operations via direct calls or the /batches API.
47    pub fn new(timeout: Duration, poll_interval: Duration) -> Self {
48        Self {
49            batch_threshold: DEFAULT_BATCH_THRESHOLD,
50            timeout,
51            poll_interval,
52        }
53    }
54
55    pub async fn execute(
56        &self,
57        token: &MarketingMailingListAccessToken,
58        ops: Vec<MailchimpOperation>,
59    ) -> anyhow::Result<Vec<MailchimpOpResult>> {
60        if ops.is_empty() {
61            return Ok(vec![]);
62        }
63        if ops.len() < self.batch_threshold {
64            Ok(execute_direct(token, ops).await)
65        } else {
66            execute_batch(token, ops, self.timeout, self.poll_interval).await
67        }
68    }
69}
70
71async fn execute_direct(
72    token: &MarketingMailingListAccessToken,
73    ops: Vec<MailchimpOperation>,
74) -> Vec<MailchimpOpResult> {
75    let mut results = Vec::with_capacity(ops.len());
76    for op in ops {
77        let url = format!(
78            "https://{}.api.mailchimp.com/3.0{}",
79            token.server_prefix, op.path
80        );
81        let mut request = REQWEST_CLIENT.request(op.method.clone(), &url).header(
82            "Authorization",
83            format!("apikey {}", token.access_token.expose_secret()),
84        );
85        if let Some(body) = op.body.clone() {
86            request = request.json(&body);
87        }
88        let response = match request.send().await {
89            Ok(resp) => resp,
90            Err(err) => {
91                results.push(MailchimpOpResult {
92                    operation_id: op.operation_id.clone(),
93                    status_code: 0,
94                    response_raw: Value::Null,
95                    response_json: None,
96                    error: Some(format!("Transport error: {}", err)),
97                });
98                continue;
99            }
100        };
101
102        let status_code = response.status().as_u16();
103        let body_bytes = match response.bytes().await {
104            Ok(bytes) => bytes,
105            Err(err) => {
106                results.push(MailchimpOpResult {
107                    operation_id: op.operation_id.clone(),
108                    status_code,
109                    response_raw: Value::Null,
110                    response_json: None,
111                    error: Some(format!("Body read error: {}", err)),
112                });
113                continue;
114            }
115        };
116
117        let response_raw = match serde_json::from_slice::<Value>(&body_bytes) {
118            Ok(json) => json,
119            Err(_) => Value::String(String::from_utf8_lossy(&body_bytes).to_string()),
120        };
121        let response_json = parse_operation_response_value(&response_raw);
122        results.push(MailchimpOpResult {
123            operation_id: op.operation_id.clone(),
124            status_code,
125            response_raw,
126            response_json,
127            error: None,
128        });
129    }
130    results
131}
132
133async fn execute_batch(
134    token: &MarketingMailingListAccessToken,
135    ops: Vec<MailchimpOperation>,
136    timeout: Duration,
137    poll_interval: Duration,
138) -> anyhow::Result<Vec<MailchimpOpResult>> {
139    if ops.is_empty() {
140        return Ok(vec![]);
141    }
142
143    let batch_ops: Vec<BatchOperation> = ops
144        .iter()
145        .map(|op| BatchOperation {
146            method: op.method.as_str().to_string(),
147            path: op.path.clone(),
148            body: op.body.as_ref().map(|b| b.to_string()).unwrap_or_default(),
149            operation_id: op.operation_id.clone(),
150        })
151        .collect();
152
153    let batch_ids = submit_batch(&token.server_prefix, &token.access_token, batch_ops).await?;
154
155    let mut results = Vec::new();
156    for batch_id in &batch_ids {
157        let poll_result = poll_batch_until_finished_with_response_url(
158            &token.server_prefix,
159            &token.access_token,
160            batch_id,
161            timeout,
162            poll_interval,
163        )
164        .await?;
165        if poll_result.total_operations > 0 {
166            info!(
167                "Mailchimp batch {} finished with {} errored operations out of {}",
168                batch_id, poll_result.errored_operations, poll_result.total_operations
169            );
170        }
171        let response_url = match poll_result.response_body_url {
172            Some(url) => url,
173            None => {
174                return Err(anyhow::anyhow!(
175                    "Mailchimp batch {} finished without response_body_url",
176                    batch_id
177                ));
178            }
179        };
180        let response = REQWEST_CLIENT.get(&response_url).send().await?;
181        if !response.status().is_success() {
182            let error_text = response.text().await.unwrap_or_default();
183            return Err(anyhow::anyhow!(
184                "Failed to fetch Mailchimp batch results: {}",
185                error_text
186            ));
187        }
188        let bytes = response.bytes().await?;
189        let batch_results = parse_batch_results_from_tar_gz(&bytes)?;
190        for result in batch_results {
191            let response_json = parse_operation_response_value(&result.response);
192            results.push(MailchimpOpResult {
193                operation_id: result.operation_id.clone(),
194                status_code: result.status_code,
195                response_raw: result.response,
196                response_json,
197                error: None,
198            });
199        }
200    }
201
202    Ok(order_results(&ops, results))
203}
204
205fn order_results(
206    ops: &[MailchimpOperation],
207    results: Vec<MailchimpOpResult>,
208) -> Vec<MailchimpOpResult> {
209    let mut ordered = Vec::new();
210    let mut results_by_id: HashMap<String, Vec<MailchimpOpResult>> = HashMap::new();
211    let mut unmatched = Vec::new();
212
213    for result in results {
214        if let Some(ref op_id) = result.operation_id {
215            results_by_id.entry(op_id.clone()).or_default().push(result);
216        } else {
217            unmatched.push(result);
218        }
219    }
220
221    for op in ops {
222        if let Some(ref op_id) = op.operation_id
223            && let Some(mut entries) = results_by_id.remove(op_id)
224        {
225            ordered.append(&mut entries);
226        }
227    }
228
229    for (_, mut entries) in results_by_id {
230        ordered.append(&mut entries);
231    }
232
233    ordered.extend(unmatched);
234    ordered
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240
241    #[test]
242    fn order_results_respects_input_order() {
243        let ops = vec![
244            MailchimpOperation {
245                method: Method::GET,
246                path: "/a".to_string(),
247                body: None,
248                operation_id: Some("u1".to_string()),
249            },
250            MailchimpOperation {
251                method: Method::GET,
252                path: "/b".to_string(),
253                body: None,
254                operation_id: Some("u2".to_string()),
255            },
256        ];
257        let results = vec![
258            MailchimpOpResult {
259                operation_id: Some("u2".to_string()),
260                status_code: 200,
261                response_raw: Value::Null,
262                response_json: None,
263                error: None,
264            },
265            MailchimpOpResult {
266                operation_id: Some("u1".to_string()),
267                status_code: 200,
268                response_raw: Value::Null,
269                response_json: None,
270                error: None,
271            },
272        ];
273        let ordered = order_results(&ops, results);
274        assert_eq!(ordered[0].operation_id.as_deref(), Some("u1"));
275        assert_eq!(ordered[1].operation_id.as_deref(), Some("u2"));
276    }
277}