subprocess/
communicate.rs

1use std::error::Error;
2use std::fmt;
3use std::fs::File;
4use std::io::{self, ErrorKind};
5use std::time::{Duration, Instant};
6
7#[cfg(unix)]
8mod raw {
9    use crate::posix;
10    use std::cmp::min;
11    use std::fs::File;
12    use std::io::{self, Read, Write};
13    use std::time::{Duration, Instant};
14
15    fn as_pollfd<'a>(f: Option<&'a File>, for_read: bool) -> posix::PollFd<'a> {
16        let events = if for_read {
17            posix::POLLIN
18        } else {
19            posix::POLLOUT
20        };
21        posix::PollFd::new(f, events)
22    }
23
24    fn maybe_poll(
25        fin: Option<&File>,
26        fout: Option<&File>,
27        ferr: Option<&File>,
28        deadline: Option<Instant>,
29    ) -> io::Result<(bool, bool, bool)> {
30        // Polling is needed to prevent deadlock when interacting with
31        // multiple streams, and for timeout.  If we're interacting with a
32        // single stream without timeout, we can skip the actual poll()
33        // syscall and just tell the caller to go ahead with reading/writing.
34        if deadline.is_none() {
35            match (&fin, &fout, &ferr) {
36                (None, None, Some(..)) => return Ok((false, false, true)),
37                (None, Some(..), None) => return Ok((false, true, false)),
38                (Some(..), None, None) => return Ok((true, false, false)),
39                _ => (),
40            }
41        }
42
43        let timeout = deadline.map(|deadline| {
44            let now = Instant::now();
45            if now >= deadline {
46                Duration::from_secs(0)
47            } else {
48                deadline - now
49            }
50        });
51
52        let mut fds = [
53            as_pollfd(fin, false),
54            as_pollfd(fout, true),
55            as_pollfd(ferr, true),
56        ];
57        posix::poll(&mut fds, timeout)?;
58
59        Ok((
60            fds[0].test(posix::POLLOUT | posix::POLLHUP),
61            fds[1].test(posix::POLLIN | posix::POLLHUP),
62            fds[2].test(posix::POLLIN | posix::POLLHUP),
63        ))
64    }
65
66    #[derive(Debug)]
67    pub struct RawCommunicator {
68        stdin: Option<File>,
69        stdout: Option<File>,
70        stderr: Option<File>,
71        input_data: Vec<u8>,
72        input_pos: usize,
73    }
74
75    impl RawCommunicator {
76        pub fn new(
77            stdin: Option<File>,
78            stdout: Option<File>,
79            stderr: Option<File>,
80            input_data: Option<Vec<u8>>,
81        ) -> RawCommunicator {
82            let input_data = input_data.unwrap_or_else(Vec::new);
83            RawCommunicator {
84                stdin,
85                stdout,
86                stderr,
87                input_data,
88                input_pos: 0,
89            }
90        }
91
92        fn do_read(
93            source_ref: &mut Option<&File>,
94            dest: &mut Vec<u8>,
95            size_limit: Option<usize>,
96            total_read: usize,
97        ) -> io::Result<()> {
98            let mut buf = &mut [0u8; 4096][..];
99            if let Some(size_limit) = size_limit {
100                if total_read >= size_limit {
101                    return Ok(());
102                }
103                if size_limit - total_read < buf.len() {
104                    buf = &mut buf[0..size_limit - total_read];
105                }
106            }
107            let n = source_ref.unwrap().read(buf)?;
108            if n != 0 {
109                dest.extend_from_slice(&buf[..n]);
110            } else {
111                *source_ref = None;
112            }
113            Ok(())
114        }
115
116        fn read_into(
117            &mut self,
118            deadline: Option<Instant>,
119            size_limit: Option<usize>,
120            outvec: &mut Vec<u8>,
121            errvec: &mut Vec<u8>,
122        ) -> io::Result<()> {
123            // Note: chunk size for writing must be smaller than the pipe buffer
124            // size.  A large enough write to a pipe deadlocks despite polling.
125            const WRITE_SIZE: usize = 4096;
126
127            let mut stdout_ref = self.stdout.as_ref();
128            let mut stderr_ref = self.stderr.as_ref();
129
130            loop {
131                if let Some(size_limit) = size_limit {
132                    if outvec.len() + errvec.len() >= size_limit {
133                        break;
134                    }
135                }
136
137                if let (None, None, None) = (self.stdin.as_ref(), stdout_ref, stderr_ref) {
138                    // When no stream remains, we are done.
139                    break;
140                }
141
142                let (in_ready, out_ready, err_ready) =
143                    maybe_poll(self.stdin.as_ref(), stdout_ref, stderr_ref, deadline)?;
144                if !in_ready && !out_ready && !err_ready {
145                    return Err(io::Error::new(io::ErrorKind::TimedOut, "timeout"));
146                }
147                if in_ready {
148                    let input = &self.input_data[self.input_pos..];
149                    let chunk = &input[..min(WRITE_SIZE, input.len())];
150                    let n = self.stdin.as_ref().unwrap().write(chunk)?;
151                    self.input_pos += n;
152                    if self.input_pos == self.input_data.len() {
153                        // close stdin when done writing, so the child receives EOF
154                        self.stdin.take();
155                        // deallocate the input data, we don't need it any more
156                        self.input_data = Vec::new();
157                    }
158                }
159                if out_ready {
160                    RawCommunicator::do_read(
161                        &mut stdout_ref,
162                        outvec,
163                        size_limit,
164                        outvec.len() + errvec.len(),
165                    )?;
166                }
167                if err_ready {
168                    RawCommunicator::do_read(
169                        &mut stderr_ref,
170                        errvec,
171                        size_limit,
172                        outvec.len() + errvec.len(),
173                    )?;
174                }
175            }
176
177            Ok(())
178        }
179
180        pub fn read(
181            &mut self,
182            deadline: Option<Instant>,
183            size_limit: Option<usize>,
184        ) -> (Option<io::Error>, (Option<Vec<u8>>, Option<Vec<u8>>)) {
185            let mut outvec = vec![];
186            let mut errvec = vec![];
187
188            let err = self
189                .read_into(deadline, size_limit, &mut outvec, &mut errvec)
190                .err();
191            let output = (
192                self.stdout.as_ref().map(|_| outvec),
193                self.stderr.as_ref().map(|_| errvec),
194            );
195            (err, output)
196        }
197    }
198}
199
200#[cfg(windows)]
201mod raw {
202    use std::fs::File;
203    use std::io::{self, Read, Write};
204    use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
205    use std::thread;
206    use std::time::Instant;
207
208    #[derive(Debug, Copy, Clone)]
209    enum StreamIdent {
210        In = 1 << 0,
211        Out = 1 << 1,
212        Err = 1 << 2,
213    }
214
215    enum Payload {
216        Data(Vec<u8>),
217        EOF,
218        Err(io::Error),
219    }
220
221    // Messages exchanged between RawCommunicator's helper threads.
222    type Message = (StreamIdent, Payload);
223
224    fn read_and_transmit(mut outfile: File, ident: StreamIdent, sink: SyncSender<Message>) {
225        let mut chunk = [0u8; 4096];
226        // Note: failing to send to the sink means we're done.  Sending will
227        // fail if the main thread drops the RawCommunicator (and with it the
228        // receiver) prematurely e.g. because a limit was reached or another
229        // helper encountered an IO error.
230        loop {
231            match outfile.read(&mut chunk) {
232                Ok(0) => {
233                    let _ = sink.send((ident, Payload::EOF));
234                    break;
235                }
236                Ok(nread) => {
237                    if let Err(_) = sink.send((ident, Payload::Data(chunk[..nread].to_vec()))) {
238                        break;
239                    }
240                }
241                Err(e) => {
242                    let _ = sink.send((ident, Payload::Err(e)));
243                    break;
244                }
245            }
246        }
247    }
248
249    fn spawn_with_arg<T: Send + 'static>(f: impl FnOnce(T) + Send + 'static, arg: T) {
250        thread::spawn(move || f(arg));
251    }
252
253    #[derive(Debug)]
254    pub struct RawCommunicator {
255        rx: mpsc::Receiver<Message>,
256        helper_set: u8,
257        requested_streams: u8,
258        leftover: Option<(StreamIdent, Vec<u8>)>,
259    }
260
261    struct Timeout;
262
263    impl RawCommunicator {
264        pub fn new(
265            stdin: Option<File>,
266            stdout: Option<File>,
267            stderr: Option<File>,
268            input_data: Option<Vec<u8>>,
269        ) -> RawCommunicator {
270            let mut helper_set = 0u8;
271            let mut requested_streams = 0u8;
272
273            let read_stdout = stdout.map(|stdout| {
274                helper_set |= StreamIdent::Out as u8;
275                requested_streams |= StreamIdent::Out as u8;
276                |tx| read_and_transmit(stdout, StreamIdent::Out, tx)
277            });
278            let read_stderr = stderr.map(|stderr| {
279                helper_set |= StreamIdent::Err as u8;
280                requested_streams |= StreamIdent::Err as u8;
281                |tx| read_and_transmit(stderr, StreamIdent::Err, tx)
282            });
283            let write_stdin = stdin.map(|mut stdin| {
284                let input_data = input_data.expect("must provide input to redirected stdin");
285                helper_set |= StreamIdent::In as u8;
286                move |tx: SyncSender<_>| match stdin.write_all(&input_data) {
287                    Ok(()) => drop(tx.send((StreamIdent::In, Payload::EOF))),
288                    Err(e) => drop(tx.send((StreamIdent::In, Payload::Err(e)))),
289                }
290            });
291
292            let (tx, rx) = mpsc::sync_channel(0);
293
294            read_stdout.map(|f| spawn_with_arg(f, tx.clone()));
295            read_stderr.map(|f| spawn_with_arg(f, tx.clone()));
296            write_stdin.map(|f| spawn_with_arg(f, tx.clone()));
297
298            RawCommunicator {
299                rx,
300                helper_set,
301                requested_streams,
302                leftover: None,
303            }
304        }
305
306        fn recv_until(&self, deadline: Option<Instant>) -> Result<Message, Timeout> {
307            if let Some(deadline) = deadline {
308                match self
309                    .rx
310                    .recv_timeout(deadline.saturating_duration_since(Instant::now()))
311                {
312                    Ok(message) => Ok(message),
313                    Err(RecvTimeoutError::Timeout) => Err(Timeout),
314                    // should never be disconnected, the helper threads always
315                    // announce their exit beforehand
316                    Err(RecvTimeoutError::Disconnected) => unreachable!(),
317                }
318            } else {
319                Ok(self.rx.recv().unwrap())
320            }
321        }
322
323        fn read_into(
324            &mut self,
325            deadline: Option<Instant>,
326            size_limit: Option<usize>,
327            outvec: &mut Vec<u8>,
328            errvec: &mut Vec<u8>,
329        ) -> io::Result<()> {
330            let mut grow_result =
331                |ident, mut data: &[u8], leftover: &mut Option<(StreamIdent, Vec<u8>)>| {
332                    if let Some(size_limit) = size_limit {
333                        let total_read = outvec.len() + errvec.len();
334                        if total_read >= size_limit {
335                            return false;
336                        }
337                        let remaining = size_limit - total_read;
338                        if data.len() > remaining {
339                            *leftover = Some((ident, data[remaining..].to_vec()));
340                            data = &data[..remaining];
341                        }
342                    }
343                    match ident {
344                        StreamIdent::Out => outvec.extend_from_slice(data),
345                        StreamIdent::Err => errvec.extend_from_slice(data),
346                        StreamIdent::In => unreachable!(),
347                    }
348                    if let Some(size_limit) = size_limit {
349                        if outvec.len() + errvec.len() >= size_limit {
350                            return false;
351                        }
352                    }
353                    return true;
354                };
355
356            if let Some((ident, data)) = self.leftover.take() {
357                if !grow_result(ident, &data, &mut self.leftover) {
358                    return Ok(());
359                }
360            }
361
362            while self.helper_set != 0 {
363                match self.recv_until(deadline) {
364                    Ok((ident, Payload::EOF)) => {
365                        self.helper_set &= !(ident as u8);
366                        continue;
367                    }
368                    Ok((ident, Payload::Data(data))) => {
369                        assert!(data.len() != 0);
370                        if !grow_result(ident, &data, &mut self.leftover) {
371                            break;
372                        }
373                    }
374                    Ok((_ident, Payload::Err(e))) => {
375                        return Err(e);
376                    }
377                    Err(Timeout) => {
378                        return Err(io::Error::new(io::ErrorKind::TimedOut, "timeout"));
379                    }
380                }
381            }
382            Ok(())
383        }
384
385        pub fn read(
386            &mut self,
387            deadline: Option<Instant>,
388            size_limit: Option<usize>,
389        ) -> (Option<io::Error>, (Option<Vec<u8>>, Option<Vec<u8>>)) {
390            // Create both vectors immediately.  This doesn't allocate, and if
391            // one of those is not needed, it just won't get resized.
392            let mut outvec = vec![];
393            let mut errvec = vec![];
394
395            let err = self
396                .read_into(deadline, size_limit, &mut outvec, &mut errvec)
397                .err();
398            let output = {
399                let (mut o, mut e) = (None, None);
400                if self.requested_streams & StreamIdent::Out as u8 != 0 {
401                    o = Some(outvec);
402                } else {
403                    assert!(outvec.len() == 0);
404                }
405                if self.requested_streams & StreamIdent::Err as u8 != 0 {
406                    e = Some(errvec);
407                } else {
408                    assert!(errvec.len() == 0);
409                }
410                (o, e)
411            };
412            (err, output)
413        }
414    }
415}
416
417use raw::RawCommunicator;
418
419/// Unattended data exchange with the subprocess.
420///
421/// When a subprocess both expects input and provides output, care must be
422/// taken to avoid deadlock.  The issue arises when the subprocess responds to
423/// part of the input data by providing some output which must be read for the
424/// subprocess to accept further input.  If the parent process is blocked on
425/// writing the input, it cannot read the output and a deadlock occurs.  This
426/// implementation avoids this issue by by reading from and writing to the
427/// subprocess in parallel.  On Unix-like systems this is achieved using
428/// `poll()`, and on Windows using threads.
429#[must_use]
430#[derive(Debug)]
431pub struct Communicator {
432    inner: RawCommunicator,
433    size_limit: Option<usize>,
434    time_limit: Option<Duration>,
435}
436
437impl Communicator {
438    fn new(
439        stdin: Option<File>,
440        stdout: Option<File>,
441        stderr: Option<File>,
442        input_data: Option<Vec<u8>>,
443    ) -> Communicator {
444        Communicator {
445            inner: RawCommunicator::new(stdin, stdout, stderr, input_data),
446            size_limit: None,
447            time_limit: None,
448        }
449    }
450
451    /// Communicate with the subprocess, return the contents of its standard
452    /// output and error.
453    ///
454    /// This will write input data to the subprocess's standard input and
455    /// simultaneously read its standard output and error.  The output and
456    /// error contents are returned as a pair of `Option<Vec>`.  The `None`
457    /// options correspond to streams not specified as `Redirection::Pipe`
458    /// when creating the subprocess.
459    ///
460    /// By default `read()` will read all data until end-of-file.
461    ///
462    /// If `limit_time` has been called, the method will read for no more than
463    /// the specified duration.  In case of timeout, an error of kind
464    /// `io::ErrorKind::TimedOut` is returned.  Communication may be resumed
465    /// after the timeout by calling `read()` again.
466    ///
467    /// If `limit_size` has been called, it will limit the allocation done by
468    /// this method.  If the subprocess provides more data than the limit
469    /// specifies, `read()` will successfully return as much data as specified
470    /// by the limit.  (It might internally read a bit more from the
471    /// subprocess, but the data will remain available for future reads.)
472    /// Subsequent data can be retrieved by calling `read()` again, which can
473    /// be repeated until `read()` returns all-empty data, which marks EOF.
474    ///
475    /// Note that this method does not wait for the subprocess to finish, only
476    /// to close its output/error streams.  It is rare but possible for the
477    /// program to continue running after having closed the streams, in which
478    /// case `Popen::Drop` will wait for it to finish.  If such a wait is
479    /// undesirable, it can be prevented by waiting explicitly using `wait()`,
480    /// by detaching the process using `detach()`, or by terminating it with
481    /// `terminate()`.
482    ///
483    /// # Panics
484    ///
485    /// If `input_data` is provided and `stdin` was not redirected to a pipe.
486    /// Also, if `input_data` is not provided and `stdin` was redirected to a
487    /// pipe.
488    ///
489    /// # Errors
490    ///
491    /// * `Err(CommunicateError)` if a system call fails.  In case of timeout,
492    /// the underlying error kind will be `ErrorKind::TimedOut`.
493    ///
494    /// Regardless of the nature of the error, the content prior to the error
495    /// can be retrieved using the [`capture`] attribute of the error.
496    ///
497    /// [`capture`]: struct.CommunicateError.html#structfield.capture
498
499    pub fn read(&mut self) -> Result<(Option<Vec<u8>>, Option<Vec<u8>>), CommunicateError> {
500        let deadline = self.time_limit.map(|timeout| Instant::now() + timeout);
501        match self.inner.read(deadline, self.size_limit) {
502            (None, capture) => Ok(capture),
503            (Some(error), capture) => Err(CommunicateError { error, capture }),
504        }
505    }
506
507    /// Return the subprocess's output and error contents as strings.
508    ///
509    /// Like `read()`, but returns strings instead of byte vectors.  Invalid
510    /// UTF-8 sequences, if found, are replaced with the the `U+FFFD` Unicode
511    /// replacement character.
512    pub fn read_string(&mut self) -> Result<(Option<String>, Option<String>), CommunicateError> {
513        let (o, e) = self.read()?;
514        Ok((o.map(from_utf8_lossy), e.map(from_utf8_lossy)))
515    }
516
517    /// Limit the amount of data the next `read()` will read from the
518    /// subprocess.
519    pub fn limit_size(mut self, size: usize) -> Communicator {
520        self.size_limit = Some(size);
521        self
522    }
523
524    /// Limit the amount of time the next `read()` will spend reading from the
525    /// subprocess.
526    pub fn limit_time(mut self, time: Duration) -> Communicator {
527        self.time_limit = Some(time);
528        self
529    }
530}
531
532/// Like String::from_utf8_lossy(), but takes `Vec<u8>` and reuses its storage if
533/// possible.
534fn from_utf8_lossy(v: Vec<u8>) -> String {
535    match String::from_utf8(v) {
536        Ok(s) => s,
537        Err(e) => String::from_utf8_lossy(e.as_bytes()).into(),
538    }
539}
540
541pub fn communicate(
542    stdin: Option<File>,
543    stdout: Option<File>,
544    stderr: Option<File>,
545    input_data: Option<Vec<u8>>,
546) -> Communicator {
547    if stdin.is_some() {
548        input_data
549            .as_ref()
550            .expect("must provide input to redirected stdin");
551    } else {
552        assert!(
553            input_data.as_ref().is_none(),
554            "cannot provide input to non-redirected stdin"
555        );
556    }
557    Communicator::new(stdin, stdout, stderr, input_data)
558}
559
560/// Error during communication.
561///
562/// It holds the underlying `io::Error` in the `error` field, and also
563/// provides the data captured before the error was encountered in the
564/// `capture` field.
565///
566/// The error description and cause are taken from the underlying IO error.
567#[derive(Debug)]
568pub struct CommunicateError {
569    /// The underlying `io::Error`.
570    pub error: io::Error,
571    /// The data captured before the error was encountered.
572    pub capture: (Option<Vec<u8>>, Option<Vec<u8>>),
573}
574
575impl CommunicateError {
576    /// Returns the corresponding IO `ErrorKind` for this error.
577    ///
578    /// Equivalent to `self.error.kind()`.
579    pub fn kind(&self) -> ErrorKind {
580        self.error.kind()
581    }
582}
583
584impl Error for CommunicateError {
585    fn source(&self) -> Option<&(dyn Error + 'static)> {
586        self.error.source()
587    }
588}
589
590impl fmt::Display for CommunicateError {
591    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
592        self.error.fmt(f)
593    }
594}