headless_lms_server/programs/mailchimp_syncer/
mailchimp_ops.rs

1use headless_lms_models::marketing_consents::MarketingMailingListAccessToken;
2use headless_lms_utils::http::REQWEST_CLIENT;
3use reqwest::Method;
4use serde_json::Value;
5use std::collections::HashMap;
6use std::time::Duration;
7
8use super::batch_client::{
9    BatchOperation, parse_batch_results_from_tar_gz, parse_operation_response_value,
10    poll_batch_until_finished_with_response_url, submit_batch,
11};
12
13const DEFAULT_BATCH_THRESHOLD: usize = 5;
14
15#[derive(Debug, Clone)]
16pub struct MailchimpOperation {
17    pub method: Method,
18    pub path: String,
19    pub body: Option<Value>,
20    pub operation_id: Option<String>,
21}
22
23#[derive(Debug, Clone)]
24pub struct MailchimpOpResult {
25    pub operation_id: Option<String>,
26    pub status_code: u16,
27    pub response_raw: Value,
28    pub response_json: Option<Value>,
29    pub error: Option<String>,
30}
31
32impl MailchimpOpResult {
33    pub fn is_success(&self) -> bool {
34        (200..=299).contains(&self.status_code) && self.error.is_none()
35    }
36}
37
38pub struct MailchimpExecutor {
39    batch_threshold: usize,
40    timeout: Duration,
41    poll_interval: Duration,
42}
43
44impl MailchimpExecutor {
45    /// Executes Mailchimp operations via direct calls or the /batches API.
46    pub fn new(timeout: Duration, poll_interval: Duration) -> Self {
47        Self {
48            batch_threshold: DEFAULT_BATCH_THRESHOLD,
49            timeout,
50            poll_interval,
51        }
52    }
53
54    pub async fn execute(
55        &self,
56        token: &MarketingMailingListAccessToken,
57        ops: Vec<MailchimpOperation>,
58    ) -> anyhow::Result<Vec<MailchimpOpResult>> {
59        if ops.is_empty() {
60            return Ok(vec![]);
61        }
62        if ops.len() < self.batch_threshold {
63            Ok(execute_direct(token, ops).await)
64        } else {
65            execute_batch(token, ops, self.timeout, self.poll_interval).await
66        }
67    }
68}
69
70async fn execute_direct(
71    token: &MarketingMailingListAccessToken,
72    ops: Vec<MailchimpOperation>,
73) -> Vec<MailchimpOpResult> {
74    let mut results = Vec::with_capacity(ops.len());
75    for op in ops {
76        let url = format!(
77            "https://{}.api.mailchimp.com/3.0{}",
78            token.server_prefix, op.path
79        );
80        let mut request = REQWEST_CLIENT
81            .request(op.method.clone(), &url)
82            .header("Authorization", format!("apikey {}", token.access_token));
83        if let Some(body) = op.body.clone() {
84            request = request.json(&body);
85        }
86        let response = match request.send().await {
87            Ok(resp) => resp,
88            Err(err) => {
89                results.push(MailchimpOpResult {
90                    operation_id: op.operation_id.clone(),
91                    status_code: 0,
92                    response_raw: Value::Null,
93                    response_json: None,
94                    error: Some(format!("Transport error: {}", err)),
95                });
96                continue;
97            }
98        };
99
100        let status_code = response.status().as_u16();
101        let body_bytes = match response.bytes().await {
102            Ok(bytes) => bytes,
103            Err(err) => {
104                results.push(MailchimpOpResult {
105                    operation_id: op.operation_id.clone(),
106                    status_code,
107                    response_raw: Value::Null,
108                    response_json: None,
109                    error: Some(format!("Body read error: {}", err)),
110                });
111                continue;
112            }
113        };
114
115        let response_raw = match serde_json::from_slice::<Value>(&body_bytes) {
116            Ok(json) => json,
117            Err(_) => Value::String(String::from_utf8_lossy(&body_bytes).to_string()),
118        };
119        let response_json = parse_operation_response_value(&response_raw);
120        results.push(MailchimpOpResult {
121            operation_id: op.operation_id.clone(),
122            status_code,
123            response_raw,
124            response_json,
125            error: None,
126        });
127    }
128    results
129}
130
131async fn execute_batch(
132    token: &MarketingMailingListAccessToken,
133    ops: Vec<MailchimpOperation>,
134    timeout: Duration,
135    poll_interval: Duration,
136) -> anyhow::Result<Vec<MailchimpOpResult>> {
137    if ops.is_empty() {
138        return Ok(vec![]);
139    }
140
141    let batch_ops: Vec<BatchOperation> = ops
142        .iter()
143        .map(|op| BatchOperation {
144            method: op.method.as_str().to_string(),
145            path: op.path.clone(),
146            body: op.body.as_ref().map(|b| b.to_string()).unwrap_or_default(),
147            operation_id: op.operation_id.clone(),
148        })
149        .collect();
150
151    let batch_ids = submit_batch(&token.server_prefix, &token.access_token, batch_ops).await?;
152
153    let mut results = Vec::new();
154    for batch_id in &batch_ids {
155        let poll_result = poll_batch_until_finished_with_response_url(
156            &token.server_prefix,
157            &token.access_token,
158            batch_id,
159            timeout,
160            poll_interval,
161        )
162        .await?;
163        if poll_result.total_operations > 0 {
164            info!(
165                "Mailchimp batch {} finished with {} errored operations out of {}",
166                batch_id, poll_result.errored_operations, poll_result.total_operations
167            );
168        }
169        let response_url = match poll_result.response_body_url {
170            Some(url) => url,
171            None => {
172                return Err(anyhow::anyhow!(
173                    "Mailchimp batch {} finished without response_body_url",
174                    batch_id
175                ));
176            }
177        };
178        let response = REQWEST_CLIENT.get(&response_url).send().await?;
179        if !response.status().is_success() {
180            let error_text = response.text().await.unwrap_or_default();
181            return Err(anyhow::anyhow!(
182                "Failed to fetch Mailchimp batch results: {}",
183                error_text
184            ));
185        }
186        let bytes = response.bytes().await?;
187        let batch_results = parse_batch_results_from_tar_gz(&bytes)?;
188        for result in batch_results {
189            let response_json = parse_operation_response_value(&result.response);
190            results.push(MailchimpOpResult {
191                operation_id: result.operation_id.clone(),
192                status_code: result.status_code,
193                response_raw: result.response,
194                response_json,
195                error: None,
196            });
197        }
198    }
199
200    Ok(order_results(&ops, results))
201}
202
203fn order_results(
204    ops: &[MailchimpOperation],
205    results: Vec<MailchimpOpResult>,
206) -> Vec<MailchimpOpResult> {
207    let mut ordered = Vec::new();
208    let mut results_by_id: HashMap<String, Vec<MailchimpOpResult>> = HashMap::new();
209    let mut unmatched = Vec::new();
210
211    for result in results {
212        if let Some(ref op_id) = result.operation_id {
213            results_by_id.entry(op_id.clone()).or_default().push(result);
214        } else {
215            unmatched.push(result);
216        }
217    }
218
219    for op in ops {
220        if let Some(ref op_id) = op.operation_id
221            && let Some(mut entries) = results_by_id.remove(op_id)
222        {
223            ordered.append(&mut entries);
224        }
225    }
226
227    for (_, mut entries) in results_by_id {
228        ordered.append(&mut entries);
229    }
230
231    ordered.extend(unmatched);
232    ordered
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    #[test]
240    fn order_results_respects_input_order() {
241        let ops = vec![
242            MailchimpOperation {
243                method: Method::GET,
244                path: "/a".to_string(),
245                body: None,
246                operation_id: Some("u1".to_string()),
247            },
248            MailchimpOperation {
249                method: Method::GET,
250                path: "/b".to_string(),
251                body: None,
252                operation_id: Some("u2".to_string()),
253            },
254        ];
255        let results = vec![
256            MailchimpOpResult {
257                operation_id: Some("u2".to_string()),
258                status_code: 200,
259                response_raw: Value::Null,
260                response_json: None,
261                error: None,
262            },
263            MailchimpOpResult {
264                operation_id: Some("u1".to_string()),
265                status_code: 200,
266                response_raw: Value::Null,
267                response_json: None,
268                error: None,
269            },
270        ];
271        let ordered = order_results(&ops, results);
272        assert_eq!(ordered[0].operation_id.as_deref(), Some("u1"));
273        assert_eq!(ordered[1].operation_id.as_deref(), Some("u2"));
274    }
275}