headless_lms_server/programs/mailchimp_syncer/
batch_client.rs

1use crate::prelude::*;
2use flate2::read::GzDecoder;
3use headless_lms_utils::http::REQWEST_CLIENT;
4use serde_json::json;
5use std::{
6    io::{Cursor, Read},
7    time::{Duration, Instant},
8};
9use tar::Archive;
10
11#[derive(Debug, Clone, Serialize)]
12pub struct BatchOperation {
13    pub method: String,
14    pub path: String,
15    pub body: String,
16    #[serde(skip_serializing_if = "Option::is_none")]
17    pub operation_id: Option<String>,
18}
19
20#[derive(Debug)]
21pub struct BatchPollResponse {
22    pub response_body_url: Option<String>,
23    pub errored_operations: u64,
24    pub total_operations: u64,
25}
26
27#[derive(Debug)]
28pub struct BatchOperationResult {
29    pub operation_id: Option<String>,
30    pub status_code: u16,
31    pub response: serde_json::Value,
32}
33
34/// Mailchimp API limit for batch operations and batch subscribe (500 items/members per request).
35pub const MAX_MAILCHIMP_BATCH_SIZE: usize = 500;
36
37/// Submits operations to Mailchimp POST /batches. Chunks at MAX_MAILCHIMP_BATCH_SIZE. Returns batch IDs.
38pub async fn submit_batch(
39    server_prefix: &str,
40    access_token: &str,
41    operations: Vec<BatchOperation>,
42) -> anyhow::Result<Vec<String>> {
43    if operations.is_empty() {
44        return Ok(vec![]);
45    }
46    let total_chunks = operations.len().div_ceil(MAX_MAILCHIMP_BATCH_SIZE);
47    let mut batch_ids = Vec::new();
48    for (chunk_index, chunk) in operations.chunks(MAX_MAILCHIMP_BATCH_SIZE).enumerate() {
49        info!(
50            "Submitting batch with {} operations (chunk {}/{})",
51            chunk.len(),
52            chunk_index + 1,
53            total_chunks
54        );
55        let body = json!({ "operations": chunk });
56        let url = format!("https://{}.api.mailchimp.com/3.0/batches", server_prefix);
57        let response = REQWEST_CLIENT
58            .post(&url)
59            .header("Authorization", format!("apikey {}", access_token))
60            .json(&body)
61            .send()
62            .await?;
63        if !response.status().is_success() {
64            let error_text = response.text().await.unwrap_or_default();
65            return Err(anyhow::anyhow!(
66                "Mailchimp batch submit failed: {}",
67                error_text
68            ));
69        }
70        let data: serde_json::Value = response.json().await?;
71        let id = data["id"]
72            .as_str()
73            .ok_or_else(|| anyhow::anyhow!("Mailchimp batch response missing id"))?
74            .to_string();
75        batch_ids.push(id);
76    }
77    Ok(batch_ids)
78}
79
80/// Polls GET /batches/{batch_id} until status is "finished", or timeout. Returns response_body_url and error counts.
81pub async fn poll_batch_until_finished_with_response_url(
82    server_prefix: &str,
83    access_token: &str,
84    batch_id: &str,
85    timeout: Duration,
86    poll_interval: Duration,
87) -> anyhow::Result<BatchPollResponse> {
88    let deadline = Instant::now() + timeout;
89    let start = Instant::now();
90    let mut last_logged_minute = 0u64;
91    let mut interval = tokio::time::interval(poll_interval);
92    loop {
93        interval.tick().await;
94        if Instant::now() >= deadline {
95            return Err(anyhow::anyhow!(
96                "Mailchimp batch {} did not finish within {:?}",
97                batch_id,
98                timeout
99            ));
100        }
101        let url = format!(
102            "https://{}.api.mailchimp.com/3.0/batches/{}",
103            server_prefix, batch_id
104        );
105        let response = REQWEST_CLIENT
106            .get(&url)
107            .header("Authorization", format!("apikey {}", access_token))
108            .send()
109            .await?;
110        if !response.status().is_success() {
111            let error_text = response.text().await.unwrap_or_default();
112            return Err(anyhow::anyhow!(
113                "Mailchimp batch status check failed: {}",
114                error_text
115            ));
116        }
117        let data: serde_json::Value = response.json().await?;
118        let status = data["status"].as_str().unwrap_or("");
119        let elapsed = start.elapsed();
120        if elapsed.as_secs() >= 60 {
121            let minute = elapsed.as_secs() / 60;
122            if minute > last_logged_minute {
123                last_logged_minute = minute;
124                info!(
125                    "Batch {} still processing after {}s (status: {})",
126                    batch_id,
127                    elapsed.as_secs(),
128                    status
129                );
130            }
131        }
132        match status {
133            "finished" => {
134                let response_body_url = data["response_body_url"].as_str().map(|s| s.to_string());
135                if let Some(ref url) = response_body_url {
136                    info!("Batch {} results available at: {}", batch_id, url);
137                }
138                let errored = data["errored_operations"].as_u64().unwrap_or(0);
139                let total = data["total_operations"].as_u64().unwrap_or(0);
140                if errored > 0 {
141                    warn!(
142                        "Mailchimp batch {} finished with {} errored operations out of {}",
143                        batch_id, errored, total
144                    );
145                }
146                return Ok(BatchPollResponse {
147                    response_body_url,
148                    errored_operations: errored,
149                    total_operations: total,
150                });
151            }
152            "error" | "expired" => {
153                return Err(anyhow::anyhow!(
154                    "Mailchimp batch {} ended with status: {}",
155                    batch_id,
156                    status
157                ));
158            }
159            _ => {}
160        }
161    }
162}
163
164fn extract_batch_results_from_value(
165    value: serde_json::Value,
166    results: &mut Vec<BatchOperationResult>,
167) -> anyhow::Result<()> {
168    match value {
169        serde_json::Value::Array(items) => {
170            for item in items {
171                extract_batch_results_from_value(item, results)?;
172            }
173        }
174        serde_json::Value::Object(map) => {
175            let status_code = if let Some(code) = map.get("status_code").and_then(|v| v.as_u64()) {
176                Some(code as u16)
177            } else if let Some(code) = map.get("status_code").and_then(|v| v.as_str()) {
178                code.parse::<u16>().ok()
179            } else {
180                None
181            };
182            if let Some(status_code) = status_code {
183                let operation_id = map
184                    .get("operation_id")
185                    .and_then(|v| v.as_str())
186                    .map(|s| s.to_string());
187                let response = map
188                    .get("response")
189                    .cloned()
190                    .unwrap_or(serde_json::Value::Null);
191                results.push(BatchOperationResult {
192                    operation_id,
193                    status_code,
194                    response,
195                });
196            } else if let Some(ops) = map.get("operations") {
197                extract_batch_results_from_value(ops.clone(), results)?;
198            }
199        }
200        _ => {}
201    }
202    Ok(())
203}
204
205pub fn parse_batch_results_from_tar_gz(bytes: &[u8]) -> anyhow::Result<Vec<BatchOperationResult>> {
206    let decoder = GzDecoder::new(Cursor::new(bytes));
207    let mut archive = Archive::new(decoder);
208    let mut results = Vec::new();
209    for entry in archive.entries()? {
210        let mut entry = entry?;
211        if !entry.header().entry_type().is_file() {
212            continue;
213        }
214        let mut contents = String::new();
215        entry.read_to_string(&mut contents)?;
216        if contents.trim().is_empty() {
217            continue;
218        }
219        let value: serde_json::Value = serde_json::from_str(&contents)?;
220        extract_batch_results_from_value(value, &mut results)?;
221    }
222    Ok(results)
223}
224
225pub fn parse_operation_response_value(value: &serde_json::Value) -> Option<serde_json::Value> {
226    match value {
227        serde_json::Value::String(raw) => serde_json::from_str(raw).ok(),
228        other => Some(other.clone()),
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use flate2::Compression;
236    use flate2::write::GzEncoder;
237    use std::io::Write;
238    use tar::Header;
239
240    fn build_tar_gz(files: &[(&str, &str)]) -> Vec<u8> {
241        let mut tar_buf = Vec::new();
242        {
243            let mut builder = tar::Builder::new(&mut tar_buf);
244            for (name, contents) in files {
245                let bytes = contents.as_bytes();
246                let mut header = Header::new_gnu();
247                header.set_path(name).expect("set path");
248                header.set_size(bytes.len() as u64);
249                header.set_mode(0o644);
250                header.set_mtime(0);
251                header.set_cksum();
252                builder.append(&header, bytes).expect("append tar entry");
253            }
254            builder.finish().expect("finish tar");
255        }
256        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
257        encoder
258            .write_all(&tar_buf)
259            .expect("write tar to gzip encoder");
260        encoder.finish().expect("finish gzip")
261    }
262
263    #[test]
264    fn parse_batch_results_from_tar_gz_reads_array_entries() {
265        let json = r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"tags\":[]}" }]"#;
266        let bytes = build_tar_gz(&[("result.json", json)]);
267        let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
268        assert_eq!(results.len(), 1);
269        let result = &results[0];
270        assert_eq!(result.operation_id.as_deref(), Some("u1"));
271        assert_eq!(result.status_code, 200);
272        let parsed = parse_operation_response_value(&result.response).expect("parse response");
273        assert!(parsed.get("tags").is_some());
274    }
275
276    #[test]
277    fn parse_batch_results_from_tar_gz_reads_multiple_files() {
278        let part1 =
279            r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"ok\":true}" }]"#;
280        let part2 = r#"[{ "status_code": "204", "operation_id": "u2", "response": "{}" }]"#;
281        let bytes = build_tar_gz(&[("part1.json", part1), ("part2.json", part2)]);
282        let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
283        assert_eq!(results.len(), 2);
284        let mut ids: Vec<_> = results
285            .iter()
286            .map(|r| r.operation_id.clone().unwrap_or_default())
287            .collect();
288        ids.sort();
289        assert_eq!(ids, vec!["u1".to_string(), "u2".to_string()]);
290        let status_u2 = results
291            .iter()
292            .find(|r| r.operation_id.as_deref() == Some("u2"));
293        assert_eq!(status_u2.unwrap().status_code, 204);
294    }
295
296    #[test]
297    fn parse_batch_results_from_tar_gz_allows_paged_results() {
298        let page1 = r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"page\":1}" }]"#;
299        let page2 = r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"page\":2}" }]"#;
300        let bytes = build_tar_gz(&[("page1.json", page1), ("page2.json", page2)]);
301        let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
302        let count = results
303            .iter()
304            .filter(|r| r.operation_id.as_deref() == Some("u1"))
305            .count();
306        assert_eq!(count, 2);
307    }
308
309    #[test]
310    fn parse_batch_results_from_tar_gz_reads_operations_wrapper() {
311        let json = r#"
312        {
313            "operations": [
314                { "status_code": 200, "operation_id": "u1", "response": "{\"ok\":true}" }
315            ]
316        }
317        "#;
318        let bytes = build_tar_gz(&[("ops.json", json)]);
319        let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
320        assert_eq!(results.len(), 1);
321        assert_eq!(results[0].operation_id.as_deref(), Some("u1"));
322    }
323
324    #[test]
325    fn parse_operation_response_value_handles_string_json() {
326        let value = serde_json::Value::String(r#"{"ok":true}"#.to_string());
327        let parsed = parse_operation_response_value(&value).expect("parse response");
328        assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
329    }
330}