Skip to main content

headless_lms_server/programs/mailchimp_syncer/
batch_client.rs

1use crate::prelude::*;
2use flate2::read::GzDecoder;
3use headless_lms_utils::http::REQWEST_CLIENT;
4use secrecy::ExposeSecret;
5use serde_json::json;
6use std::{
7    io::{Cursor, Read},
8    time::{Duration, Instant},
9};
10use tar::Archive;
11
12#[derive(Debug, Clone, Serialize)]
13pub struct BatchOperation {
14    pub method: String,
15    pub path: String,
16    pub body: String,
17    #[serde(skip_serializing_if = "Option::is_none")]
18    pub operation_id: Option<String>,
19}
20
21#[derive(Debug)]
22pub struct BatchPollResponse {
23    pub response_body_url: Option<String>,
24    pub errored_operations: u64,
25    pub total_operations: u64,
26}
27
28#[derive(Debug)]
29pub struct BatchOperationResult {
30    pub operation_id: Option<String>,
31    pub status_code: u16,
32    pub response: serde_json::Value,
33}
34
35/// Mailchimp API limit for batch operations and batch subscribe (500 items/members per request).
36pub const MAX_MAILCHIMP_BATCH_SIZE: usize = 500;
37
38/// Submits operations to Mailchimp POST /batches. Chunks at MAX_MAILCHIMP_BATCH_SIZE. Returns batch IDs.
39pub async fn submit_batch(
40    server_prefix: &str,
41    access_token: &DbSecret,
42    operations: Vec<BatchOperation>,
43) -> anyhow::Result<Vec<String>> {
44    if operations.is_empty() {
45        return Ok(vec![]);
46    }
47    let total_chunks = operations.len().div_ceil(MAX_MAILCHIMP_BATCH_SIZE);
48    let mut batch_ids = Vec::new();
49    for (chunk_index, chunk) in operations.chunks(MAX_MAILCHIMP_BATCH_SIZE).enumerate() {
50        info!(
51            "Submitting batch with {} operations (chunk {}/{})",
52            chunk.len(),
53            chunk_index + 1,
54            total_chunks
55        );
56        let body = json!({ "operations": chunk });
57        let url = format!("https://{}.api.mailchimp.com/3.0/batches", server_prefix);
58        let response = REQWEST_CLIENT
59            .post(&url)
60            .header(
61                "Authorization",
62                format!("apikey {}", access_token.expose_secret()),
63            )
64            .json(&body)
65            .send()
66            .await?;
67        if !response.status().is_success() {
68            let error_text = response.text().await.unwrap_or_default();
69            return Err(anyhow::anyhow!(
70                "Mailchimp batch submit failed: {}",
71                error_text
72            ));
73        }
74        let data: serde_json::Value = response.json().await?;
75        let id = data["id"]
76            .as_str()
77            .ok_or_else(|| anyhow::anyhow!("Mailchimp batch response missing id"))?
78            .to_string();
79        batch_ids.push(id);
80    }
81    Ok(batch_ids)
82}
83
84/// Polls GET /batches/{batch_id} until status is "finished", or timeout. Returns response_body_url and error counts.
85pub async fn poll_batch_until_finished_with_response_url(
86    server_prefix: &str,
87    access_token: &DbSecret,
88    batch_id: &str,
89    timeout: Duration,
90    poll_interval: Duration,
91) -> anyhow::Result<BatchPollResponse> {
92    let deadline = Instant::now() + timeout;
93    let start = Instant::now();
94    let mut last_logged_minute = 0u64;
95    let mut interval = tokio::time::interval(poll_interval);
96    loop {
97        interval.tick().await;
98        if Instant::now() >= deadline {
99            return Err(anyhow::anyhow!(
100                "Mailchimp batch {} did not finish within {:?}",
101                batch_id,
102                timeout
103            ));
104        }
105        let url = format!(
106            "https://{}.api.mailchimp.com/3.0/batches/{}",
107            server_prefix, batch_id
108        );
109        let response = REQWEST_CLIENT
110            .get(&url)
111            .header(
112                "Authorization",
113                format!("apikey {}", access_token.expose_secret()),
114            )
115            .send()
116            .await?;
117        if !response.status().is_success() {
118            let error_text = response.text().await.unwrap_or_default();
119            return Err(anyhow::anyhow!(
120                "Mailchimp batch status check failed: {}",
121                error_text
122            ));
123        }
124        let data: serde_json::Value = response.json().await?;
125        let status = data["status"].as_str().unwrap_or("");
126        let elapsed = start.elapsed();
127        if elapsed.as_secs() >= 60 {
128            let minute = elapsed.as_secs() / 60;
129            if minute > last_logged_minute {
130                last_logged_minute = minute;
131                info!(
132                    "Batch {} still processing after {}s (status: {})",
133                    batch_id,
134                    elapsed.as_secs(),
135                    status
136                );
137            }
138        }
139        match status {
140            "finished" => {
141                let response_body_url = data["response_body_url"].as_str().map(|s| s.to_string());
142                if let Some(ref url) = response_body_url {
143                    info!("Batch {} results available at: {}", batch_id, url);
144                }
145                let errored = data["errored_operations"].as_u64().unwrap_or(0);
146                let total = data["total_operations"].as_u64().unwrap_or(0);
147                if errored > 0 {
148                    warn!(
149                        "Mailchimp batch {} finished with {} errored operations out of {}",
150                        batch_id, errored, total
151                    );
152                }
153                return Ok(BatchPollResponse {
154                    response_body_url,
155                    errored_operations: errored,
156                    total_operations: total,
157                });
158            }
159            "error" | "expired" => {
160                return Err(anyhow::anyhow!(
161                    "Mailchimp batch {} ended with status: {}",
162                    batch_id,
163                    status
164                ));
165            }
166            _ => {}
167        }
168    }
169}
170
171fn extract_batch_results_from_value(
172    value: serde_json::Value,
173    results: &mut Vec<BatchOperationResult>,
174) -> anyhow::Result<()> {
175    match value {
176        serde_json::Value::Array(items) => {
177            for item in items {
178                extract_batch_results_from_value(item, results)?;
179            }
180        }
181        serde_json::Value::Object(map) => {
182            let status_code = if let Some(code) = map.get("status_code").and_then(|v| v.as_u64()) {
183                Some(code as u16)
184            } else if let Some(code) = map.get("status_code").and_then(|v| v.as_str()) {
185                code.parse::<u16>().ok()
186            } else {
187                None
188            };
189            if let Some(status_code) = status_code {
190                let operation_id = map
191                    .get("operation_id")
192                    .and_then(|v| v.as_str())
193                    .map(|s| s.to_string());
194                let response = map
195                    .get("response")
196                    .cloned()
197                    .unwrap_or(serde_json::Value::Null);
198                results.push(BatchOperationResult {
199                    operation_id,
200                    status_code,
201                    response,
202                });
203            } else if let Some(ops) = map.get("operations") {
204                extract_batch_results_from_value(ops.clone(), results)?;
205            }
206        }
207        _ => {}
208    }
209    Ok(())
210}
211
212pub fn parse_batch_results_from_tar_gz(bytes: &[u8]) -> anyhow::Result<Vec<BatchOperationResult>> {
213    let decoder = GzDecoder::new(Cursor::new(bytes));
214    let mut archive = Archive::new(decoder);
215    let mut results = Vec::new();
216    for entry in archive.entries()? {
217        let mut entry = entry?;
218        if !entry.header().entry_type().is_file() {
219            continue;
220        }
221        let mut contents = String::new();
222        entry.read_to_string(&mut contents)?;
223        if contents.trim().is_empty() {
224            continue;
225        }
226        let value: serde_json::Value = serde_json::from_str(&contents)?;
227        extract_batch_results_from_value(value, &mut results)?;
228    }
229    Ok(results)
230}
231
232pub fn parse_operation_response_value(value: &serde_json::Value) -> Option<serde_json::Value> {
233    match value {
234        serde_json::Value::String(raw) => serde_json::from_str(raw).ok(),
235        other => Some(other.clone()),
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use flate2::Compression;
243    use flate2::write::GzEncoder;
244    use std::io::Write;
245    use tar::Header;
246
247    fn build_tar_gz(files: &[(&str, &str)]) -> Vec<u8> {
248        let mut tar_buf = Vec::new();
249        {
250            let mut builder = tar::Builder::new(&mut tar_buf);
251            for (name, contents) in files {
252                let bytes = contents.as_bytes();
253                let mut header = Header::new_gnu();
254                header.set_path(name).expect("set path");
255                header.set_size(bytes.len() as u64);
256                header.set_mode(0o644);
257                header.set_mtime(0);
258                header.set_cksum();
259                builder.append(&header, bytes).expect("append tar entry");
260            }
261            builder.finish().expect("finish tar");
262        }
263        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
264        encoder
265            .write_all(&tar_buf)
266            .expect("write tar to gzip encoder");
267        encoder.finish().expect("finish gzip")
268    }
269
270    #[test]
271    fn parse_batch_results_from_tar_gz_reads_array_entries() {
272        let json = r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"tags\":[]}" }]"#;
273        let bytes = build_tar_gz(&[("result.json", json)]);
274        let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
275        assert_eq!(results.len(), 1);
276        let result = &results[0];
277        assert_eq!(result.operation_id.as_deref(), Some("u1"));
278        assert_eq!(result.status_code, 200);
279        let parsed = parse_operation_response_value(&result.response).expect("parse response");
280        assert!(parsed.get("tags").is_some());
281    }
282
283    #[test]
284    fn parse_batch_results_from_tar_gz_reads_multiple_files() {
285        let part1 =
286            r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"ok\":true}" }]"#;
287        let part2 = r#"[{ "status_code": "204", "operation_id": "u2", "response": "{}" }]"#;
288        let bytes = build_tar_gz(&[("part1.json", part1), ("part2.json", part2)]);
289        let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
290        assert_eq!(results.len(), 2);
291        let mut ids: Vec<_> = results
292            .iter()
293            .map(|r| r.operation_id.clone().unwrap_or_default())
294            .collect();
295        ids.sort();
296        assert_eq!(ids, vec!["u1".to_string(), "u2".to_string()]);
297        let status_u2 = results
298            .iter()
299            .find(|r| r.operation_id.as_deref() == Some("u2"));
300        assert_eq!(status_u2.unwrap().status_code, 204);
301    }
302
303    #[test]
304    fn parse_batch_results_from_tar_gz_allows_paged_results() {
305        let page1 = r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"page\":1}" }]"#;
306        let page2 = r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"page\":2}" }]"#;
307        let bytes = build_tar_gz(&[("page1.json", page1), ("page2.json", page2)]);
308        let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
309        let count = results
310            .iter()
311            .filter(|r| r.operation_id.as_deref() == Some("u1"))
312            .count();
313        assert_eq!(count, 2);
314    }
315
316    #[test]
317    fn parse_batch_results_from_tar_gz_reads_operations_wrapper() {
318        let json = r#"
319        {
320            "operations": [
321                { "status_code": 200, "operation_id": "u1", "response": "{\"ok\":true}" }
322            ]
323        }
324        "#;
325        let bytes = build_tar_gz(&[("ops.json", json)]);
326        let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
327        assert_eq!(results.len(), 1);
328        assert_eq!(results[0].operation_id.as_deref(), Some("u1"));
329    }
330
331    #[test]
332    fn parse_operation_response_value_handles_string_json() {
333        let value = serde_json::Value::String(r#"{"ok":true}"#.to_string());
334        let parsed = parse_operation_response_value(&value).expect("parse response");
335        assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
336    }
337}