headless_lms_server/programs/mailchimp_syncer/
batch_client.rs1use crate::prelude::*;
2use flate2::read::GzDecoder;
3use headless_lms_utils::http::REQWEST_CLIENT;
4use serde_json::json;
5use std::{
6 io::{Cursor, Read},
7 time::{Duration, Instant},
8};
9use tar::Archive;
10
11#[derive(Debug, Clone, Serialize)]
12pub struct BatchOperation {
13 pub method: String,
14 pub path: String,
15 pub body: String,
16 #[serde(skip_serializing_if = "Option::is_none")]
17 pub operation_id: Option<String>,
18}
19
20#[derive(Debug)]
21pub struct BatchPollResponse {
22 pub response_body_url: Option<String>,
23 pub errored_operations: u64,
24 pub total_operations: u64,
25}
26
27#[derive(Debug)]
28pub struct BatchOperationResult {
29 pub operation_id: Option<String>,
30 pub status_code: u16,
31 pub response: serde_json::Value,
32}
33
34pub const MAX_MAILCHIMP_BATCH_SIZE: usize = 500;
36
37pub async fn submit_batch(
39 server_prefix: &str,
40 access_token: &str,
41 operations: Vec<BatchOperation>,
42) -> anyhow::Result<Vec<String>> {
43 if operations.is_empty() {
44 return Ok(vec![]);
45 }
46 let total_chunks = operations.len().div_ceil(MAX_MAILCHIMP_BATCH_SIZE);
47 let mut batch_ids = Vec::new();
48 for (chunk_index, chunk) in operations.chunks(MAX_MAILCHIMP_BATCH_SIZE).enumerate() {
49 info!(
50 "Submitting batch with {} operations (chunk {}/{})",
51 chunk.len(),
52 chunk_index + 1,
53 total_chunks
54 );
55 let body = json!({ "operations": chunk });
56 let url = format!("https://{}.api.mailchimp.com/3.0/batches", server_prefix);
57 let response = REQWEST_CLIENT
58 .post(&url)
59 .header("Authorization", format!("apikey {}", access_token))
60 .json(&body)
61 .send()
62 .await?;
63 if !response.status().is_success() {
64 let error_text = response.text().await.unwrap_or_default();
65 return Err(anyhow::anyhow!(
66 "Mailchimp batch submit failed: {}",
67 error_text
68 ));
69 }
70 let data: serde_json::Value = response.json().await?;
71 let id = data["id"]
72 .as_str()
73 .ok_or_else(|| anyhow::anyhow!("Mailchimp batch response missing id"))?
74 .to_string();
75 batch_ids.push(id);
76 }
77 Ok(batch_ids)
78}
79
80pub async fn poll_batch_until_finished_with_response_url(
82 server_prefix: &str,
83 access_token: &str,
84 batch_id: &str,
85 timeout: Duration,
86 poll_interval: Duration,
87) -> anyhow::Result<BatchPollResponse> {
88 let deadline = Instant::now() + timeout;
89 let start = Instant::now();
90 let mut last_logged_minute = 0u64;
91 let mut interval = tokio::time::interval(poll_interval);
92 loop {
93 interval.tick().await;
94 if Instant::now() >= deadline {
95 return Err(anyhow::anyhow!(
96 "Mailchimp batch {} did not finish within {:?}",
97 batch_id,
98 timeout
99 ));
100 }
101 let url = format!(
102 "https://{}.api.mailchimp.com/3.0/batches/{}",
103 server_prefix, batch_id
104 );
105 let response = REQWEST_CLIENT
106 .get(&url)
107 .header("Authorization", format!("apikey {}", access_token))
108 .send()
109 .await?;
110 if !response.status().is_success() {
111 let error_text = response.text().await.unwrap_or_default();
112 return Err(anyhow::anyhow!(
113 "Mailchimp batch status check failed: {}",
114 error_text
115 ));
116 }
117 let data: serde_json::Value = response.json().await?;
118 let status = data["status"].as_str().unwrap_or("");
119 let elapsed = start.elapsed();
120 if elapsed.as_secs() >= 60 {
121 let minute = elapsed.as_secs() / 60;
122 if minute > last_logged_minute {
123 last_logged_minute = minute;
124 info!(
125 "Batch {} still processing after {}s (status: {})",
126 batch_id,
127 elapsed.as_secs(),
128 status
129 );
130 }
131 }
132 match status {
133 "finished" => {
134 let response_body_url = data["response_body_url"].as_str().map(|s| s.to_string());
135 if let Some(ref url) = response_body_url {
136 info!("Batch {} results available at: {}", batch_id, url);
137 }
138 let errored = data["errored_operations"].as_u64().unwrap_or(0);
139 let total = data["total_operations"].as_u64().unwrap_or(0);
140 if errored > 0 {
141 warn!(
142 "Mailchimp batch {} finished with {} errored operations out of {}",
143 batch_id, errored, total
144 );
145 }
146 return Ok(BatchPollResponse {
147 response_body_url,
148 errored_operations: errored,
149 total_operations: total,
150 });
151 }
152 "error" | "expired" => {
153 return Err(anyhow::anyhow!(
154 "Mailchimp batch {} ended with status: {}",
155 batch_id,
156 status
157 ));
158 }
159 _ => {}
160 }
161 }
162}
163
164fn extract_batch_results_from_value(
165 value: serde_json::Value,
166 results: &mut Vec<BatchOperationResult>,
167) -> anyhow::Result<()> {
168 match value {
169 serde_json::Value::Array(items) => {
170 for item in items {
171 extract_batch_results_from_value(item, results)?;
172 }
173 }
174 serde_json::Value::Object(map) => {
175 let status_code = if let Some(code) = map.get("status_code").and_then(|v| v.as_u64()) {
176 Some(code as u16)
177 } else if let Some(code) = map.get("status_code").and_then(|v| v.as_str()) {
178 code.parse::<u16>().ok()
179 } else {
180 None
181 };
182 if let Some(status_code) = status_code {
183 let operation_id = map
184 .get("operation_id")
185 .and_then(|v| v.as_str())
186 .map(|s| s.to_string());
187 let response = map
188 .get("response")
189 .cloned()
190 .unwrap_or(serde_json::Value::Null);
191 results.push(BatchOperationResult {
192 operation_id,
193 status_code,
194 response,
195 });
196 } else if let Some(ops) = map.get("operations") {
197 extract_batch_results_from_value(ops.clone(), results)?;
198 }
199 }
200 _ => {}
201 }
202 Ok(())
203}
204
205pub fn parse_batch_results_from_tar_gz(bytes: &[u8]) -> anyhow::Result<Vec<BatchOperationResult>> {
206 let decoder = GzDecoder::new(Cursor::new(bytes));
207 let mut archive = Archive::new(decoder);
208 let mut results = Vec::new();
209 for entry in archive.entries()? {
210 let mut entry = entry?;
211 if !entry.header().entry_type().is_file() {
212 continue;
213 }
214 let mut contents = String::new();
215 entry.read_to_string(&mut contents)?;
216 if contents.trim().is_empty() {
217 continue;
218 }
219 let value: serde_json::Value = serde_json::from_str(&contents)?;
220 extract_batch_results_from_value(value, &mut results)?;
221 }
222 Ok(results)
223}
224
225pub fn parse_operation_response_value(value: &serde_json::Value) -> Option<serde_json::Value> {
226 match value {
227 serde_json::Value::String(raw) => serde_json::from_str(raw).ok(),
228 other => Some(other.clone()),
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use flate2::Compression;
236 use flate2::write::GzEncoder;
237 use std::io::Write;
238 use tar::Header;
239
240 fn build_tar_gz(files: &[(&str, &str)]) -> Vec<u8> {
241 let mut tar_buf = Vec::new();
242 {
243 let mut builder = tar::Builder::new(&mut tar_buf);
244 for (name, contents) in files {
245 let bytes = contents.as_bytes();
246 let mut header = Header::new_gnu();
247 header.set_path(name).expect("set path");
248 header.set_size(bytes.len() as u64);
249 header.set_mode(0o644);
250 header.set_mtime(0);
251 header.set_cksum();
252 builder.append(&header, bytes).expect("append tar entry");
253 }
254 builder.finish().expect("finish tar");
255 }
256 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
257 encoder
258 .write_all(&tar_buf)
259 .expect("write tar to gzip encoder");
260 encoder.finish().expect("finish gzip")
261 }
262
263 #[test]
264 fn parse_batch_results_from_tar_gz_reads_array_entries() {
265 let json = r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"tags\":[]}" }]"#;
266 let bytes = build_tar_gz(&[("result.json", json)]);
267 let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
268 assert_eq!(results.len(), 1);
269 let result = &results[0];
270 assert_eq!(result.operation_id.as_deref(), Some("u1"));
271 assert_eq!(result.status_code, 200);
272 let parsed = parse_operation_response_value(&result.response).expect("parse response");
273 assert!(parsed.get("tags").is_some());
274 }
275
276 #[test]
277 fn parse_batch_results_from_tar_gz_reads_multiple_files() {
278 let part1 =
279 r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"ok\":true}" }]"#;
280 let part2 = r#"[{ "status_code": "204", "operation_id": "u2", "response": "{}" }]"#;
281 let bytes = build_tar_gz(&[("part1.json", part1), ("part2.json", part2)]);
282 let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
283 assert_eq!(results.len(), 2);
284 let mut ids: Vec<_> = results
285 .iter()
286 .map(|r| r.operation_id.clone().unwrap_or_default())
287 .collect();
288 ids.sort();
289 assert_eq!(ids, vec!["u1".to_string(), "u2".to_string()]);
290 let status_u2 = results
291 .iter()
292 .find(|r| r.operation_id.as_deref() == Some("u2"));
293 assert_eq!(status_u2.unwrap().status_code, 204);
294 }
295
296 #[test]
297 fn parse_batch_results_from_tar_gz_allows_paged_results() {
298 let page1 = r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"page\":1}" }]"#;
299 let page2 = r#"[{ "status_code": 200, "operation_id": "u1", "response": "{\"page\":2}" }]"#;
300 let bytes = build_tar_gz(&[("page1.json", page1), ("page2.json", page2)]);
301 let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
302 let count = results
303 .iter()
304 .filter(|r| r.operation_id.as_deref() == Some("u1"))
305 .count();
306 assert_eq!(count, 2);
307 }
308
309 #[test]
310 fn parse_batch_results_from_tar_gz_reads_operations_wrapper() {
311 let json = r#"
312 {
313 "operations": [
314 { "status_code": 200, "operation_id": "u1", "response": "{\"ok\":true}" }
315 ]
316 }
317 "#;
318 let bytes = build_tar_gz(&[("ops.json", json)]);
319 let results = parse_batch_results_from_tar_gz(&bytes).expect("parse tar.gz");
320 assert_eq!(results.len(), 1);
321 assert_eq!(results[0].operation_id.as_deref(), Some("u1"));
322 }
323
324 #[test]
325 fn parse_operation_response_value_handles_string_json() {
326 let value = serde_json::Value::String(r#"{"ok":true}"#.to_string());
327 let parsed = parse_operation_response_value(&value).expect("parse response");
328 assert_eq!(parsed.get("ok").and_then(|v| v.as_bool()), Some(true));
329 }
330}