headless_lms_server/programs/mailchimp_syncer/
batch_client.rs1use crate::prelude::*;
2use flate2::read::GzDecoder;
3use headless_lms_utils::http::REQWEST_CLIENT;
4use secrecy::ExposeSecret;
5use serde_json::json;
6use std::{
7 io::{Cursor, Read},
8 time::{Duration, Instant},
9};
10use tar::Archive;
11
12#[derive(Debug, Clone, Serialize)]
13pub struct BatchOperation {
14 pub method: String,
15 pub path: String,
16 pub body: String,
17 #[serde(skip_serializing_if = "Option::is_none")]
18 pub operation_id: Option<String>,
19}
20
21#[derive(Debug)]
22pub struct BatchPollResponse {
23 pub response_body_url: Option<String>,
24 pub errored_operations: u64,
25 pub total_operations: u64,
26}
27
28#[derive(Debug)]
29pub struct BatchOperationResult {
30 pub operation_id: Option<String>,
31 pub status_code: u16,
32 pub response: serde_json::Value,
33}
34
35pub const MAX_MAILCHIMP_BATCH_SIZE: usize = 500;
37
38pub async fn submit_batch(
40 server_prefix: &str,
41 access_token: &DbSecret,
42 operations: Vec<BatchOperation>,
43) -> anyhow::Result<Vec<String>> {
44 if operations.is_empty() {
45 return Ok(vec![]);
46 }
47 let total_chunks = operations.len().div_ceil(MAX_MAILCHIMP_BATCH_SIZE);
48 let mut batch_ids = Vec::new();
49 for (chunk_index, chunk) in operations.chunks(MAX_MAILCHIMP_BATCH_SIZE).enumerate() {
50 info!(
51 "Submitting batch with {} operations (chunk {}/{})",
52 chunk.len(),
53 chunk_index + 1,
54 total_chunks
55 );
56 let body = json!({ "operations": chunk });
57 let url = format!("https://{}.api.mailchimp.com/3.0/batches", server_prefix);
58 let response = REQWEST_CLIENT
59 .post(&url)
60 .header(
61 "Authorization",
62 format!("apikey {}", access_token.expose_secret()),
63 )
64 .json(&body)
65 .send()
66 .await?;
67 if !response.status().is_success() {
68 let error_text = response.text().await.unwrap_or_default();
69 return Err(anyhow::anyhow!(
70 "Mailchimp batch submit failed: {}",
71 error_text
72 ));
73 }
74 let data: serde_json::Value = response.json().await?;
75 let id = data["id"]
76 .as_str()
77 .ok_or_else(|| anyhow::anyhow!("Mailchimp batch response missing id"))?
78 .to_string();
79 batch_ids.push(id);
80 }
81 Ok(batch_ids)
82}
83
84pub async fn poll_batch_until_finished_with_response_url(
86 server_prefix: &str,
87 access_token: &DbSecret,
88 batch_id: &str,
89 timeout: Duration,
90 poll_interval: Duration,
91) -> anyhow::Result<BatchPollResponse> {
92 let deadline = Instant::now() + timeout;
93 let start = Instant::now();
94 let mut last_logged_minute = 0u64;
95 let mut interval = tokio::time::interval(poll_interval);
96 loop {
97 interval.tick().await;
98 if Instant::now() >= deadline {
99 return Err(anyhow::anyhow!(
100 "Mailchimp batch {} did not finish within {:?}",
101 batch_id,
102 timeout
103 ));
104 }
105 let url = format!(
106 "https://{}.api.mailchimp.com/3.0/batches/{}",
107 server_prefix, batch_id
108 );
109 let response = REQWEST_CLIENT
110 .get(&url)
111 .header(
112 "Authorization",
113 format!("apikey {}", access_token.expose_secret()),
114 )
115 .send()
116 .await?;
117 if !response.status().is_success() {
118 let error_text = response.text().await.unwrap_or_default();
119 return Err(anyhow::anyhow!(
120 "Mailchimp batch status check failed: {}",
121 error_text
122 ));
123 }
124 let data: serde_json::Value = response.json().await?;
125 let status = data["status"].as_str().unwrap_or("");
126 let elapsed = start.elapsed();
127 if elapsed.as_secs() >= 60 {
128 let minute = elapsed.as_secs() / 60;
129 if minute > last_logged_minute {
130 last_logged_minute = minute;
131 info!(
132 "Batch {} still processing after {}s (status: {})",
133 batch_id,
134 elapsed.as_secs(),
135 status
136 );
137 }
138 }
139 match status {
140 "finished" => {
141 let response_body_url = data["response_body_url"].as_str().map(|s| s.to_string());
142 if let Some(ref url) = response_body_url {
143 info!("Batch {} results available at: {}", batch_id, url);
144 }
145 let errored = data["errored_operations"].as_u64().unwrap_or(0);
146 let total = data["total_operations"].as_u64().unwrap_or(0);
147 if errored > 0 {
148 warn!(
149 "Mailchimp batch {} finished with {} errored operations out of {}",
150 batch_id, errored, total
151 );
152 }
153 return Ok(BatchPollResponse {
154 response_body_url,
155 errored_operations: errored,
156 total_operations: total,
157 });
158 }
159 "error" | "expired" => {
160 return Err(anyhow::anyhow!(
161 "Mailchimp batch {} ended with status: {}",
162 batch_id,
163 status
164 ));
165 }
166 _ => {}
167 }
168 }
169}
170
171fn extract_batch_results_from_value(
172 value: serde_json::Value,
173 results: &mut Vec<BatchOperationResult>,
174) -> anyhow::Result<()> {
175 match value {
176 serde_json::Value::Array(items) => {
177 for item in items {
178 extract_batch_results_from_value(item, results)?;
179 }
180 }
181 serde_json::Value::Object(map) => {
182 let status_code = if let Some(code) = map.get("status_code").and_then(|v| v.as_u64()) {
183 Some(code as u16)
184 } else if let Some(code) = map.get("status_code").and_then(|v| v.as_str()) {
185 code.parse::<u16>().ok()
186 } else {
187 None
188 };
189 if let Some(status_code) = status_code {
190 let operation_id = map
191 .get("operation_id")
192 .and_then(|v| v.as_str())
193 .map(|s| s.to_string());
194 let response = map
195 .get("response")
196 .cloned()
197 .unwrap_or(serde_json::Value::Null);
198 results.push(BatchOperationResult {
199 operation_id,
200 status_code,
201 response,
202 });
203 } else if let Some(ops) = map.get("operations") {
204 extract_batch_results_from_value(ops.clone(), results)?;
205 }
206 }
207 _ => {}
208 }
209 Ok(())
210}
211
212pub fn parse_batch_results_from_tar_gz(bytes: &[u8]) -> anyhow::Result<Vec<BatchOperationResult>> {
213 let decoder = GzDecoder::new(Cursor::new(bytes));
214 let mut archive = Archive::new(decoder);
215 let mut results = Vec::new();
216 for entry in archive.entries()? {
217 let mut entry = entry?;
218 if !entry.header().entry_type().is_file() {
219 continue;
220 }
221 let mut contents = String::new();
222 entry.read_to_string(&mut contents)?;
223 if contents.trim().is_empty() {
224 continue;
225 }
226 let value: serde_json::Value = serde_json::from_str(&contents)?;
227 extract_batch_results_from_value(value, &mut results)?;
228 }
229 Ok(results)
230}
231
232pub fn parse_operation_response_value(value: &serde_json::Value) -> Option<serde_json::Value> {
233 match value {
234 serde_json::Value::String(raw) => serde_json::from_str(raw).ok(),
235 other => Some(other.clone()),
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242 use flate2::Compression;
243 use flate2::write::GzEncoder;
244 use std::io::Write;
245 use tar::Header;
246
247 fn build_tar_gz(files: &[(&str, &str)]) -> Vec<u8> {
248 let mut tar_buf = Vec::new();
249 {
250 let mut builder = tar::Builder::new(&mut tar_buf);
251 for (name, contents) in files {
252 let bytes = contents.as_bytes();
253 let mut header = Header::new_gnu();
254 header.set_path(name).expect("set path");
255 header.set_size(bytes.len() as u64);
256 header.set_mode(0o644);
257 header.set_mtime(0);
258 header.set_cksum();
259 builder.append(&header, bytes).expect("append tar entry");
260 }
261 builder.finish().expect("finish tar");
262 }
263 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
264 encoder
265 .write_all(&tar_buf)
266 .expect("write tar to gzip encoder");
267 encoder.finish().expect("finish gzip")
268 }
269
270 #[test]
271 fn parse_batch_results_from_tar_gz_reads_array_entries() {
272 let json = r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"tags\":[]}" }]"#;
273 let bytes = build_tar_gz(&[("result.json", json)]);
274 let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
275 assert_eq!(results.len(), 1);
276 let result = &results[0];
277 assert_eq!(result.operation_id.as_deref(), Some("u1"));
278 assert_eq!(result.status_code, 200);
279 let parsed = parse_operation_response_value(&result.response).expect("parse response");
280 assert!(parsed.get("tags").is_some());
281 }
282
283 #[test]
284 fn parse_batch_results_from_tar_gz_reads_multiple_files() {
285 let part1 =
286 r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"ok\":true}" }]"#;
287 let part2 = r#"[{ "status_code": "204", "operation_id": "u2", "response": "{}" }]"#;
288 let bytes = build_tar_gz(&[("part1.json", part1), ("part2.json", part2)]);
289 let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
290 assert_eq!(results.len(), 2);
291 let mut ids: Vec<_> = results
292 .iter()
293 .map(|r| r.operation_id.clone().unwrap_or_default())
294 .collect();
295 ids.sort();
296 assert_eq!(ids, vec!["u1".to_string(), "u2".to_string()]);
297 let status_u2 = results
298 .iter()
299 .find(|r| r.operation_id.as_deref() == Some("u2"));
300 assert_eq!(status_u2.unwrap().status_code, 204);
301 }
302
303 #[test]
304 fn parse_batch_results_from_tar_gz_allows_paged_results() {
305 let page1 = r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"page\":1}" }]"#;
306 let page2 = r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"page\":2}" }]"#;
307 let bytes = build_tar_gz(&[("page1.json", page1), ("page2.json", page2)]);
308 let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
309 let count = results
310 .iter()
311 .filter(|r| r.operation_id.as_deref() == Some("u1"))
312 .count();
313 assert_eq!(count, 2);
314 }
315
316 #[test]
317 fn parse_batch_results_from_tar_gz_reads_operations_wrapper() {
318 let json = r#"
319 {
320 "operations": [
321 { "status_code": 200, "operation_id": "u1", "response": "{\"ok\":true}" }
322 ]
323 }
324 "#;
325 let bytes = build_tar_gz(&[("ops.json", json)]);
326 let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
327 assert_eq!(results.len(), 1);
328 assert_eq!(results[0].operation_id.as_deref(), Some("u1"));
329 }
330
331 #[test]
332 fn parse_operation_response_value_handles_string_json() {
333 let value = serde_json::Value::String(r#"{"ok":true}"#.to_string());
334 let parsed = parse_operation_response_value(&value).expect("parse response");
335 assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
336 }
337}