headless_lms_server/programs/mailchimp_syncer/
mailchimp_ops.rs1use headless_lms_models::marketing_consents::MarketingMailingListAccessToken;
2use headless_lms_utils::http::REQWEST_CLIENT;
3use reqwest::Method;
4use serde_json::Value;
5use std::collections::HashMap;
6use std::time::Duration;
7
8use super::batch_client::{
9 BatchOperation, parse_batch_results_from_tar_gz, parse_operation_response_value,
10 poll_batch_until_finished_with_response_url, submit_batch,
11};
12
13const DEFAULT_BATCH_THRESHOLD: usize = 5;
14
15#[derive(Debug, Clone)]
16pub struct MailchimpOperation {
17 pub method: Method,
18 pub path: String,
19 pub body: Option<Value>,
20 pub operation_id: Option<String>,
21}
22
23#[derive(Debug, Clone)]
24pub struct MailchimpOpResult {
25 pub operation_id: Option<String>,
26 pub status_code: u16,
27 pub response_raw: Value,
28 pub response_json: Option<Value>,
29 pub error: Option<String>,
30}
31
32impl MailchimpOpResult {
33 pub fn is_success(&self) -> bool {
34 (200..=299).contains(&self.status_code) && self.error.is_none()
35 }
36}
37
38pub struct MailchimpExecutor {
39 batch_threshold: usize,
40 timeout: Duration,
41 poll_interval: Duration,
42}
43
44impl MailchimpExecutor {
45 pub fn new(timeout: Duration, poll_interval: Duration) -> Self {
47 Self {
48 batch_threshold: DEFAULT_BATCH_THRESHOLD,
49 timeout,
50 poll_interval,
51 }
52 }
53
54 pub async fn execute(
55 &self,
56 token: &MarketingMailingListAccessToken,
57 ops: Vec<MailchimpOperation>,
58 ) -> anyhow::Result<Vec<MailchimpOpResult>> {
59 if ops.is_empty() {
60 return Ok(vec![]);
61 }
62 if ops.len() < self.batch_threshold {
63 Ok(execute_direct(token, ops).await)
64 } else {
65 execute_batch(token, ops, self.timeout, self.poll_interval).await
66 }
67 }
68}
69
70async fn execute_direct(
71 token: &MarketingMailingListAccessToken,
72 ops: Vec<MailchimpOperation>,
73) -> Vec<MailchimpOpResult> {
74 let mut results = Vec::with_capacity(ops.len());
75 for op in ops {
76 let url = format!(
77 "https://{}.api.mailchimp.com/3.0{}",
78 token.server_prefix, op.path
79 );
80 let mut request = REQWEST_CLIENT
81 .request(op.method.clone(), &url)
82 .header("Authorization", format!("apikey {}", token.access_token));
83 if let Some(body) = op.body.clone() {
84 request = request.json(&body);
85 }
86 let response = match request.send().await {
87 Ok(resp) => resp,
88 Err(err) => {
89 results.push(MailchimpOpResult {
90 operation_id: op.operation_id.clone(),
91 status_code: 0,
92 response_raw: Value::Null,
93 response_json: None,
94 error: Some(format!("Transport error: {}", err)),
95 });
96 continue;
97 }
98 };
99
100 let status_code = response.status().as_u16();
101 let body_bytes = match response.bytes().await {
102 Ok(bytes) => bytes,
103 Err(err) => {
104 results.push(MailchimpOpResult {
105 operation_id: op.operation_id.clone(),
106 status_code,
107 response_raw: Value::Null,
108 response_json: None,
109 error: Some(format!("Body read error: {}", err)),
110 });
111 continue;
112 }
113 };
114
115 let response_raw = match serde_json::from_slice::<Value>(&body_bytes) {
116 Ok(json) => json,
117 Err(_) => Value::String(String::from_utf8_lossy(&body_bytes).to_string()),
118 };
119 let response_json = parse_operation_response_value(&response_raw);
120 results.push(MailchimpOpResult {
121 operation_id: op.operation_id.clone(),
122 status_code,
123 response_raw,
124 response_json,
125 error: None,
126 });
127 }
128 results
129}
130
131async fn execute_batch(
132 token: &MarketingMailingListAccessToken,
133 ops: Vec<MailchimpOperation>,
134 timeout: Duration,
135 poll_interval: Duration,
136) -> anyhow::Result<Vec<MailchimpOpResult>> {
137 if ops.is_empty() {
138 return Ok(vec![]);
139 }
140
141 let batch_ops: Vec<BatchOperation> = ops
142 .iter()
143 .map(|op| BatchOperation {
144 method: op.method.as_str().to_string(),
145 path: op.path.clone(),
146 body: op.body.as_ref().map(|b| b.to_string()).unwrap_or_default(),
147 operation_id: op.operation_id.clone(),
148 })
149 .collect();
150
151 let batch_ids = submit_batch(&token.server_prefix, &token.access_token, batch_ops).await?;
152
153 let mut results = Vec::new();
154 for batch_id in &batch_ids {
155 let poll_result = poll_batch_until_finished_with_response_url(
156 &token.server_prefix,
157 &token.access_token,
158 batch_id,
159 timeout,
160 poll_interval,
161 )
162 .await?;
163 if poll_result.total_operations > 0 {
164 info!(
165 "Mailchimp batch {} finished with {} errored operations out of {}",
166 batch_id, poll_result.errored_operations, poll_result.total_operations
167 );
168 }
169 let response_url = match poll_result.response_body_url {
170 Some(url) => url,
171 None => {
172 return Err(anyhow::anyhow!(
173 "Mailchimp batch {} finished without response_body_url",
174 batch_id
175 ));
176 }
177 };
178 let response = REQWEST_CLIENT.get(&response_url).send().await?;
179 if !response.status().is_success() {
180 let error_text = response.text().await.unwrap_or_default();
181 return Err(anyhow::anyhow!(
182 "Failed to fetch Mailchimp batch results: {}",
183 error_text
184 ));
185 }
186 let bytes = response.bytes().await?;
187 let batch_results = parse_batch_results_from_tar_gz(&bytes)?;
188 for result in batch_results {
189 let response_json = parse_operation_response_value(&result.response);
190 results.push(MailchimpOpResult {
191 operation_id: result.operation_id.clone(),
192 status_code: result.status_code,
193 response_raw: result.response,
194 response_json,
195 error: None,
196 });
197 }
198 }
199
200 Ok(order_results(&ops, results))
201}
202
203fn order_results(
204 ops: &[MailchimpOperation],
205 results: Vec<MailchimpOpResult>,
206) -> Vec<MailchimpOpResult> {
207 let mut ordered = Vec::new();
208 let mut results_by_id: HashMap<String, Vec<MailchimpOpResult>> = HashMap::new();
209 let mut unmatched = Vec::new();
210
211 for result in results {
212 if let Some(ref op_id) = result.operation_id {
213 results_by_id.entry(op_id.clone()).or_default().push(result);
214 } else {
215 unmatched.push(result);
216 }
217 }
218
219 for op in ops {
220 if let Some(ref op_id) = op.operation_id
221 && let Some(mut entries) = results_by_id.remove(op_id)
222 {
223 ordered.append(&mut entries);
224 }
225 }
226
227 for (_, mut entries) in results_by_id {
228 ordered.append(&mut entries);
229 }
230
231 ordered.extend(unmatched);
232 ordered
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238
239 #[test]
240 fn order_results_respects_input_order() {
241 let ops = vec![
242 MailchimpOperation {
243 method: Method::GET,
244 path: "/a".to_string(),
245 body: None,
246 operation_id: Some("u1".to_string()),
247 },
248 MailchimpOperation {
249 method: Method::GET,
250 path: "/b".to_string(),
251 body: None,
252 operation_id: Some("u2".to_string()),
253 },
254 ];
255 let results = vec![
256 MailchimpOpResult {
257 operation_id: Some("u2".to_string()),
258 status_code: 200,
259 response_raw: Value::Null,
260 response_json: None,
261 error: None,
262 },
263 MailchimpOpResult {
264 operation_id: Some("u1".to_string()),
265 status_code: 200,
266 response_raw: Value::Null,
267 response_json: None,
268 error: None,
269 },
270 ];
271 let ordered = order_results(&ops, results);
272 assert_eq!(ordered[0].operation_id.as_deref(), Some("u1"));
273 assert_eq!(ordered[1].operation_id.as_deref(), Some("u2"));
274 }
275}