1use std::error::Error;
2use std::fmt;
3use std::fs::File;
4use std::io::{self, ErrorKind};
5use std::time::{Duration, Instant};
6
7#[cfg(unix)]
8mod raw {
9 use crate::posix;
10 use std::cmp::min;
11 use std::fs::File;
12 use std::io::{self, Read, Write};
13 use std::time::{Duration, Instant};
14
15 fn as_pollfd<'a>(f: Option<&'a File>, for_read: bool) -> posix::PollFd<'a> {
16 let events = if for_read {
17 posix::POLLIN
18 } else {
19 posix::POLLOUT
20 };
21 posix::PollFd::new(f, events)
22 }
23
24 fn maybe_poll(
25 fin: Option<&File>,
26 fout: Option<&File>,
27 ferr: Option<&File>,
28 deadline: Option<Instant>,
29 ) -> io::Result<(bool, bool, bool)> {
30 if deadline.is_none() {
35 match (&fin, &fout, &ferr) {
36 (None, None, Some(..)) => return Ok((false, false, true)),
37 (None, Some(..), None) => return Ok((false, true, false)),
38 (Some(..), None, None) => return Ok((true, false, false)),
39 _ => (),
40 }
41 }
42
43 let timeout = deadline.map(|deadline| {
44 let now = Instant::now();
45 if now >= deadline {
46 Duration::from_secs(0)
47 } else {
48 deadline - now
49 }
50 });
51
52 let mut fds = [
53 as_pollfd(fin, false),
54 as_pollfd(fout, true),
55 as_pollfd(ferr, true),
56 ];
57 posix::poll(&mut fds, timeout)?;
58
59 Ok((
60 fds[0].test(posix::POLLOUT | posix::POLLHUP),
61 fds[1].test(posix::POLLIN | posix::POLLHUP),
62 fds[2].test(posix::POLLIN | posix::POLLHUP),
63 ))
64 }
65
66 #[derive(Debug)]
67 pub struct RawCommunicator {
68 stdin: Option<File>,
69 stdout: Option<File>,
70 stderr: Option<File>,
71 input_data: Vec<u8>,
72 input_pos: usize,
73 }
74
75 impl RawCommunicator {
76 pub fn new(
77 stdin: Option<File>,
78 stdout: Option<File>,
79 stderr: Option<File>,
80 input_data: Option<Vec<u8>>,
81 ) -> RawCommunicator {
82 let input_data = input_data.unwrap_or_else(Vec::new);
83 RawCommunicator {
84 stdin,
85 stdout,
86 stderr,
87 input_data,
88 input_pos: 0,
89 }
90 }
91
92 fn do_read(
93 source_ref: &mut Option<&File>,
94 dest: &mut Vec<u8>,
95 size_limit: Option<usize>,
96 total_read: usize,
97 ) -> io::Result<()> {
98 let mut buf = &mut [0u8; 4096][..];
99 if let Some(size_limit) = size_limit {
100 if total_read >= size_limit {
101 return Ok(());
102 }
103 if size_limit - total_read < buf.len() {
104 buf = &mut buf[0..size_limit - total_read];
105 }
106 }
107 let n = source_ref.unwrap().read(buf)?;
108 if n != 0 {
109 dest.extend_from_slice(&buf[..n]);
110 } else {
111 *source_ref = None;
112 }
113 Ok(())
114 }
115
116 fn read_into(
117 &mut self,
118 deadline: Option<Instant>,
119 size_limit: Option<usize>,
120 outvec: &mut Vec<u8>,
121 errvec: &mut Vec<u8>,
122 ) -> io::Result<()> {
123 const WRITE_SIZE: usize = 4096;
126
127 let mut stdout_ref = self.stdout.as_ref();
128 let mut stderr_ref = self.stderr.as_ref();
129
130 loop {
131 if let Some(size_limit) = size_limit {
132 if outvec.len() + errvec.len() >= size_limit {
133 break;
134 }
135 }
136
137 if let (None, None, None) = (self.stdin.as_ref(), stdout_ref, stderr_ref) {
138 break;
140 }
141
142 let (in_ready, out_ready, err_ready) =
143 maybe_poll(self.stdin.as_ref(), stdout_ref, stderr_ref, deadline)?;
144 if !in_ready && !out_ready && !err_ready {
145 return Err(io::Error::new(io::ErrorKind::TimedOut, "timeout"));
146 }
147 if in_ready {
148 let input = &self.input_data[self.input_pos..];
149 let chunk = &input[..min(WRITE_SIZE, input.len())];
150 let n = self.stdin.as_ref().unwrap().write(chunk)?;
151 self.input_pos += n;
152 if self.input_pos == self.input_data.len() {
153 self.stdin.take();
155 self.input_data = Vec::new();
157 }
158 }
159 if out_ready {
160 RawCommunicator::do_read(
161 &mut stdout_ref,
162 outvec,
163 size_limit,
164 outvec.len() + errvec.len(),
165 )?;
166 }
167 if err_ready {
168 RawCommunicator::do_read(
169 &mut stderr_ref,
170 errvec,
171 size_limit,
172 outvec.len() + errvec.len(),
173 )?;
174 }
175 }
176
177 Ok(())
178 }
179
180 pub fn read(
181 &mut self,
182 deadline: Option<Instant>,
183 size_limit: Option<usize>,
184 ) -> (Option<io::Error>, (Option<Vec<u8>>, Option<Vec<u8>>)) {
185 let mut outvec = vec![];
186 let mut errvec = vec![];
187
188 let err = self
189 .read_into(deadline, size_limit, &mut outvec, &mut errvec)
190 .err();
191 let output = (
192 self.stdout.as_ref().map(|_| outvec),
193 self.stderr.as_ref().map(|_| errvec),
194 );
195 (err, output)
196 }
197 }
198}
199
200#[cfg(windows)]
201mod raw {
202 use std::fs::File;
203 use std::io::{self, Read, Write};
204 use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
205 use std::thread;
206 use std::time::Instant;
207
208 #[derive(Debug, Copy, Clone)]
209 enum StreamIdent {
210 In = 1 << 0,
211 Out = 1 << 1,
212 Err = 1 << 2,
213 }
214
215 enum Payload {
216 Data(Vec<u8>),
217 EOF,
218 Err(io::Error),
219 }
220
221 type Message = (StreamIdent, Payload);
223
224 fn read_and_transmit(mut outfile: File, ident: StreamIdent, sink: SyncSender<Message>) {
225 let mut chunk = [0u8; 4096];
226 loop {
231 match outfile.read(&mut chunk) {
232 Ok(0) => {
233 let _ = sink.send((ident, Payload::EOF));
234 break;
235 }
236 Ok(nread) => {
237 if let Err(_) = sink.send((ident, Payload::Data(chunk[..nread].to_vec()))) {
238 break;
239 }
240 }
241 Err(e) => {
242 let _ = sink.send((ident, Payload::Err(e)));
243 break;
244 }
245 }
246 }
247 }
248
249 fn spawn_with_arg<T: Send + 'static>(f: impl FnOnce(T) + Send + 'static, arg: T) {
250 thread::spawn(move || f(arg));
251 }
252
253 #[derive(Debug)]
254 pub struct RawCommunicator {
255 rx: mpsc::Receiver<Message>,
256 helper_set: u8,
257 requested_streams: u8,
258 leftover: Option<(StreamIdent, Vec<u8>)>,
259 }
260
261 struct Timeout;
262
263 impl RawCommunicator {
264 pub fn new(
265 stdin: Option<File>,
266 stdout: Option<File>,
267 stderr: Option<File>,
268 input_data: Option<Vec<u8>>,
269 ) -> RawCommunicator {
270 let mut helper_set = 0u8;
271 let mut requested_streams = 0u8;
272
273 let read_stdout = stdout.map(|stdout| {
274 helper_set |= StreamIdent::Out as u8;
275 requested_streams |= StreamIdent::Out as u8;
276 |tx| read_and_transmit(stdout, StreamIdent::Out, tx)
277 });
278 let read_stderr = stderr.map(|stderr| {
279 helper_set |= StreamIdent::Err as u8;
280 requested_streams |= StreamIdent::Err as u8;
281 |tx| read_and_transmit(stderr, StreamIdent::Err, tx)
282 });
283 let write_stdin = stdin.map(|mut stdin| {
284 let input_data = input_data.expect("must provide input to redirected stdin");
285 helper_set |= StreamIdent::In as u8;
286 move |tx: SyncSender<_>| match stdin.write_all(&input_data) {
287 Ok(()) => drop(tx.send((StreamIdent::In, Payload::EOF))),
288 Err(e) => drop(tx.send((StreamIdent::In, Payload::Err(e)))),
289 }
290 });
291
292 let (tx, rx) = mpsc::sync_channel(0);
293
294 read_stdout.map(|f| spawn_with_arg(f, tx.clone()));
295 read_stderr.map(|f| spawn_with_arg(f, tx.clone()));
296 write_stdin.map(|f| spawn_with_arg(f, tx.clone()));
297
298 RawCommunicator {
299 rx,
300 helper_set,
301 requested_streams,
302 leftover: None,
303 }
304 }
305
306 fn recv_until(&self, deadline: Option<Instant>) -> Result<Message, Timeout> {
307 if let Some(deadline) = deadline {
308 match self
309 .rx
310 .recv_timeout(deadline.saturating_duration_since(Instant::now()))
311 {
312 Ok(message) => Ok(message),
313 Err(RecvTimeoutError::Timeout) => Err(Timeout),
314 Err(RecvTimeoutError::Disconnected) => unreachable!(),
317 }
318 } else {
319 Ok(self.rx.recv().unwrap())
320 }
321 }
322
323 fn read_into(
324 &mut self,
325 deadline: Option<Instant>,
326 size_limit: Option<usize>,
327 outvec: &mut Vec<u8>,
328 errvec: &mut Vec<u8>,
329 ) -> io::Result<()> {
330 let mut grow_result =
331 |ident, mut data: &[u8], leftover: &mut Option<(StreamIdent, Vec<u8>)>| {
332 if let Some(size_limit) = size_limit {
333 let total_read = outvec.len() + errvec.len();
334 if total_read >= size_limit {
335 return false;
336 }
337 let remaining = size_limit - total_read;
338 if data.len() > remaining {
339 *leftover = Some((ident, data[remaining..].to_vec()));
340 data = &data[..remaining];
341 }
342 }
343 match ident {
344 StreamIdent::Out => outvec.extend_from_slice(data),
345 StreamIdent::Err => errvec.extend_from_slice(data),
346 StreamIdent::In => unreachable!(),
347 }
348 if let Some(size_limit) = size_limit {
349 if outvec.len() + errvec.len() >= size_limit {
350 return false;
351 }
352 }
353 return true;
354 };
355
356 if let Some((ident, data)) = self.leftover.take() {
357 if !grow_result(ident, &data, &mut self.leftover) {
358 return Ok(());
359 }
360 }
361
362 while self.helper_set != 0 {
363 match self.recv_until(deadline) {
364 Ok((ident, Payload::EOF)) => {
365 self.helper_set &= !(ident as u8);
366 continue;
367 }
368 Ok((ident, Payload::Data(data))) => {
369 assert!(data.len() != 0);
370 if !grow_result(ident, &data, &mut self.leftover) {
371 break;
372 }
373 }
374 Ok((_ident, Payload::Err(e))) => {
375 return Err(e);
376 }
377 Err(Timeout) => {
378 return Err(io::Error::new(io::ErrorKind::TimedOut, "timeout"));
379 }
380 }
381 }
382 Ok(())
383 }
384
385 pub fn read(
386 &mut self,
387 deadline: Option<Instant>,
388 size_limit: Option<usize>,
389 ) -> (Option<io::Error>, (Option<Vec<u8>>, Option<Vec<u8>>)) {
390 let mut outvec = vec![];
393 let mut errvec = vec![];
394
395 let err = self
396 .read_into(deadline, size_limit, &mut outvec, &mut errvec)
397 .err();
398 let output = {
399 let (mut o, mut e) = (None, None);
400 if self.requested_streams & StreamIdent::Out as u8 != 0 {
401 o = Some(outvec);
402 } else {
403 assert!(outvec.len() == 0);
404 }
405 if self.requested_streams & StreamIdent::Err as u8 != 0 {
406 e = Some(errvec);
407 } else {
408 assert!(errvec.len() == 0);
409 }
410 (o, e)
411 };
412 (err, output)
413 }
414 }
415}
416
417use raw::RawCommunicator;
418
419#[must_use]
430#[derive(Debug)]
431pub struct Communicator {
432 inner: RawCommunicator,
433 size_limit: Option<usize>,
434 time_limit: Option<Duration>,
435}
436
437impl Communicator {
438 fn new(
439 stdin: Option<File>,
440 stdout: Option<File>,
441 stderr: Option<File>,
442 input_data: Option<Vec<u8>>,
443 ) -> Communicator {
444 Communicator {
445 inner: RawCommunicator::new(stdin, stdout, stderr, input_data),
446 size_limit: None,
447 time_limit: None,
448 }
449 }
450
451 pub fn read(&mut self) -> Result<(Option<Vec<u8>>, Option<Vec<u8>>), CommunicateError> {
500 let deadline = self.time_limit.map(|timeout| Instant::now() + timeout);
501 match self.inner.read(deadline, self.size_limit) {
502 (None, capture) => Ok(capture),
503 (Some(error), capture) => Err(CommunicateError { error, capture }),
504 }
505 }
506
507 pub fn read_string(&mut self) -> Result<(Option<String>, Option<String>), CommunicateError> {
513 let (o, e) = self.read()?;
514 Ok((o.map(from_utf8_lossy), e.map(from_utf8_lossy)))
515 }
516
517 pub fn limit_size(mut self, size: usize) -> Communicator {
520 self.size_limit = Some(size);
521 self
522 }
523
524 pub fn limit_time(mut self, time: Duration) -> Communicator {
527 self.time_limit = Some(time);
528 self
529 }
530}
531
532fn from_utf8_lossy(v: Vec<u8>) -> String {
535 match String::from_utf8(v) {
536 Ok(s) => s,
537 Err(e) => String::from_utf8_lossy(e.as_bytes()).into(),
538 }
539}
540
541pub fn communicate(
542 stdin: Option<File>,
543 stdout: Option<File>,
544 stderr: Option<File>,
545 input_data: Option<Vec<u8>>,
546) -> Communicator {
547 if stdin.is_some() {
548 input_data
549 .as_ref()
550 .expect("must provide input to redirected stdin");
551 } else {
552 assert!(
553 input_data.as_ref().is_none(),
554 "cannot provide input to non-redirected stdin"
555 );
556 }
557 Communicator::new(stdin, stdout, stderr, input_data)
558}
559
560#[derive(Debug)]
568pub struct CommunicateError {
569 pub error: io::Error,
571 pub capture: (Option<Vec<u8>>, Option<Vec<u8>>),
573}
574
575impl CommunicateError {
576 pub fn kind(&self) -> ErrorKind {
580 self.error.kind()
581 }
582}
583
584impl Error for CommunicateError {
585 fn source(&self) -> Option<&(dyn Error + 'static)> {
586 self.error.source()
587 }
588}
589
590impl fmt::Display for CommunicateError {
591 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
592 self.error.fmt(f)
593 }
594}