xref: /drstd/src/std/sys/windows/pipe.rs (revision 86982c5e9b2eaa583327251616ee822c36288824)
1 use crate::std::os::windows::prelude::*;
2 
3 use crate::std::ffi::OsStr;
4 use crate::std::io::{self, BorrowedCursor, IoSlice, IoSliceMut, Read};
5 use crate::std::mem;
6 use crate::std::path::Path;
7 use crate::std::ptr;
8 use crate::std::slice;
9 use crate::std::sync::atomic::AtomicUsize;
10 use crate::std::sync::atomic::Ordering::SeqCst;
11 use crate::std::sys::c;
12 use crate::std::sys::fs::{File, OpenOptions};
13 use crate::std::sys::handle::Handle;
14 use crate::std::sys::hashmap_random_keys;
15 use crate::std::sys_common::IntoInner;
16 
17 ////////////////////////////////////////////////////////////////////////////////
18 // Anonymous pipes
19 ////////////////////////////////////////////////////////////////////////////////
20 
21 pub struct AnonPipe {
22     inner: Handle,
23 }
24 
25 impl IntoInner<Handle> for AnonPipe {
26     fn into_inner(self) -> Handle {
27         self.inner
28     }
29 }
30 
31 pub struct Pipes {
32     pub ours: AnonPipe,
33     pub theirs: AnonPipe,
34 }
35 
36 /// Although this looks similar to `anon_pipe` in the Unix module it's actually
37 /// subtly different. Here we'll return two pipes in the `Pipes` return value,
38 /// but one is intended for "us" where as the other is intended for "someone
39 /// else".
40 ///
41 /// Currently the only use case for this function is pipes for stdio on
42 /// processes in the standard library, so "ours" is the one that'll stay in our
43 /// process whereas "theirs" will be inherited to a child.
44 ///
45 /// The ours/theirs pipes are *not* specifically readable or writable. Each
46 /// one only supports a read or a write, but which is which depends on the
47 /// boolean flag given. If `ours_readable` is `true`, then `ours` is readable and
48 /// `theirs` is writable. Conversely, if `ours_readable` is `false`, then `ours`
49 /// is writable and `theirs` is readable.
50 ///
51 /// Also note that the `ours` pipe is always a handle opened up in overlapped
52 /// mode. This means that technically speaking it should only ever be used
53 /// with `OVERLAPPED` instances, but also works out ok if it's only ever used
54 /// once at a time (which we do indeed guarantee).
55 pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes> {
56     // A 64kb pipe capacity is the same as a typical Linux default.
57     const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;
58 
59     // Note that we specifically do *not* use `CreatePipe` here because
60     // unfortunately the anonymous pipes returned do not support overlapped
61     // operations. Instead, we create a "hopefully unique" name and create a
62     // named pipe which has overlapped operations enabled.
63     //
64     // Once we do this, we connect do it as usual via `CreateFileW`, and then
65     // we return those reader/writer halves. Note that the `ours` pipe return
66     // value is always the named pipe, whereas `theirs` is just the normal file.
67     // This should hopefully shield us from child processes which assume their
68     // stdout is a named pipe, which would indeed be odd!
69     unsafe {
70         let ours;
71         let mut name;
72         let mut tries = 0;
73         let mut reject_remote_clients_flag = c::PIPE_REJECT_REMOTE_CLIENTS;
74         loop {
75             tries += 1;
76             name = format!(
77                 r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}",
78                 c::GetCurrentProcessId(),
79                 random_number()
80             );
81             let wide_name = OsStr::new(&name)
82                 .encode_wide()
83                 .chain(Some(0))
84                 .collect::<Vec<_>>();
85             let mut flags = c::FILE_FLAG_FIRST_PIPE_INSTANCE | c::FILE_FLAG_OVERLAPPED;
86             if ours_readable {
87                 flags |= c::PIPE_ACCESS_INBOUND;
88             } else {
89                 flags |= c::PIPE_ACCESS_OUTBOUND;
90             }
91 
92             let handle = c::CreateNamedPipeW(
93                 wide_name.as_ptr(),
94                 flags,
95                 c::PIPE_TYPE_BYTE
96                     | c::PIPE_READMODE_BYTE
97                     | c::PIPE_WAIT
98                     | reject_remote_clients_flag,
99                 1,
100                 PIPE_BUFFER_CAPACITY,
101                 PIPE_BUFFER_CAPACITY,
102                 0,
103                 ptr::null_mut(),
104             );
105 
106             // We pass the `FILE_FLAG_FIRST_PIPE_INSTANCE` flag above, and we're
107             // also just doing a best effort at selecting a unique name. If
108             // `ERROR_ACCESS_DENIED` is returned then it could mean that we
109             // accidentally conflicted with an already existing pipe, so we try
110             // again.
111             //
112             // Don't try again too much though as this could also perhaps be a
113             // legit error.
114             // If `ERROR_INVALID_PARAMETER` is returned, this probably means we're
115             // running on pre-Vista version where `PIPE_REJECT_REMOTE_CLIENTS` is
116             // not supported, so we continue retrying without it. This implies
117             // reduced security on Windows versions older than Vista by allowing
118             // connections to this pipe from remote machines.
119             // Proper fix would increase the number of FFI imports and introduce
120             // significant amount of Windows XP specific code with no clean
121             // testing strategy
122             // For more info, see https://github.com/rust-lang/rust/pull/37677.
123             if handle == c::INVALID_HANDLE_VALUE {
124                 let err = io::Error::last_os_error();
125                 let raw_os_err = err.raw_os_error();
126                 if tries < 10 {
127                     if raw_os_err == Some(c::ERROR_ACCESS_DENIED as i32) {
128                         continue;
129                     } else if reject_remote_clients_flag != 0
130                         && raw_os_err == Some(c::ERROR_INVALID_PARAMETER as i32)
131                     {
132                         reject_remote_clients_flag = 0;
133                         tries -= 1;
134                         continue;
135                     }
136                 }
137                 return Err(err);
138             }
139             ours = Handle::from_raw_handle(handle);
140             break;
141         }
142 
143         // Connect to the named pipe we just created. This handle is going to be
144         // returned in `theirs`, so if `ours` is readable we want this to be
145         // writable, otherwise if `ours` is writable we want this to be
146         // readable.
147         //
148         // Additionally we don't enable overlapped mode on this because most
149         // client processes aren't enabled to work with that.
150         let mut opts = OpenOptions::new();
151         opts.write(ours_readable);
152         opts.read(!ours_readable);
153         opts.share_mode(0);
154         let size = mem::size_of::<c::SECURITY_ATTRIBUTES>();
155         let mut sa = c::SECURITY_ATTRIBUTES {
156             nLength: size as c::DWORD,
157             lpSecurityDescriptor: ptr::null_mut(),
158             bInheritHandle: their_handle_inheritable as i32,
159         };
160         opts.security_attributes(&mut sa);
161         let theirs = File::open(Path::new(&name), &opts)?;
162         let theirs = AnonPipe {
163             inner: theirs.into_inner(),
164         };
165 
166         Ok(Pipes {
167             ours: AnonPipe { inner: ours },
168             theirs: AnonPipe {
169                 inner: theirs.into_inner(),
170             },
171         })
172     }
173 }
174 
175 /// Takes an asynchronous source pipe and returns a synchronous pipe suitable
176 /// for sending to a child process.
177 ///
178 /// This is achieved by creating a new set of pipes and spawning a thread that
179 /// relays messages between the source and the synchronous pipe.
180 pub fn spawn_pipe_relay(
181     source: &AnonPipe,
182     ours_readable: bool,
183     their_handle_inheritable: bool,
184 ) -> io::Result<AnonPipe> {
185     // We need this handle to live for the lifetime of the thread spawned below.
186     let source = source.duplicate()?;
187 
188     // create a new pair of anon pipes.
189     let Pipes { theirs, ours } = anon_pipe(ours_readable, their_handle_inheritable)?;
190 
191     // Spawn a thread that passes messages from one pipe to the other.
192     // Any errors will simply cause the thread to exit.
193     let (reader, writer) = if ours_readable {
194         (ours, source)
195     } else {
196         (source, ours)
197     };
198     crate::std::thread::spawn(move || {
199         let mut buf = [0_u8; 4096];
200         'reader: while let Ok(len) = reader.read(&mut buf) {
201             if len == 0 {
202                 break;
203             }
204             let mut start = 0;
205             while let Ok(written) = writer.write(&buf[start..len]) {
206                 start += written;
207                 if start == len {
208                     continue 'reader;
209                 }
210             }
211             break;
212         }
213     });
214 
215     // Return the pipe that should be sent to the child process.
216     Ok(theirs)
217 }
218 
219 fn random_number() -> usize {
220     static N: AtomicUsize = AtomicUsize::new(0);
221     loop {
222         if N.load(SeqCst) != 0 {
223             return N.fetch_add(1, SeqCst);
224         }
225 
226         N.store(hashmap_random_keys().0 as usize, SeqCst);
227     }
228 }
229 
230 // Abstracts over `ReadFileEx` and `WriteFileEx`
231 type AlertableIoFn = unsafe extern "system" fn(
232     BorrowedHandle<'_>,
233     c::LPVOID,
234     c::DWORD,
235     c::LPOVERLAPPED,
236     c::LPOVERLAPPED_COMPLETION_ROUTINE,
237 ) -> c::BOOL;
238 
239 impl AnonPipe {
240     pub fn handle(&self) -> &Handle {
241         &self.inner
242     }
243     pub fn into_handle(self) -> Handle {
244         self.inner
245     }
246     fn duplicate(&self) -> io::Result<Self> {
247         self.inner
248             .duplicate(0, false, c::DUPLICATE_SAME_ACCESS)
249             .map(|inner| AnonPipe { inner })
250     }
251 
252     pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
253         let result = unsafe {
254             let len = crate::std::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
255             self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
256         };
257 
258         match result {
259             // The special treatment of BrokenPipe is to deal with Windows
260             // pipe semantics, which yields this error when *reading* from
261             // a pipe after the other end has closed; we interpret that as
262             // EOF on the pipe.
263             Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
264             _ => result,
265         }
266     }
267 
268     pub fn read_buf(&self, mut buf: BorrowedCursor<'_>) -> io::Result<()> {
269         let result = unsafe {
270             let len = crate::std::cmp::min(buf.capacity(), c::DWORD::MAX as usize) as c::DWORD;
271             self.alertable_io_internal(c::ReadFileEx, buf.as_mut().as_mut_ptr() as _, len)
272         };
273 
274         match result {
275             // The special treatment of BrokenPipe is to deal with Windows
276             // pipe semantics, which yields this error when *reading* from
277             // a pipe after the other end has closed; we interpret that as
278             // EOF on the pipe.
279             Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(()),
280             Err(e) => Err(e),
281             Ok(n) => {
282                 unsafe {
283                     buf.advance(n);
284                 }
285                 Ok(())
286             }
287         }
288     }
289 
290     pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
291         self.inner.read_vectored(bufs)
292     }
293 
294     #[inline]
295     pub fn is_read_vectored(&self) -> bool {
296         self.inner.is_read_vectored()
297     }
298 
299     pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
300         self.handle().read_to_end(buf)
301     }
302 
303     pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
304         unsafe {
305             let len = crate::std::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
306             self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
307         }
308     }
309 
310     pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
311         self.inner.write_vectored(bufs)
312     }
313 
314     #[inline]
315     pub fn is_write_vectored(&self) -> bool {
316         self.inner.is_write_vectored()
317     }
318 
319     /// Synchronizes asynchronous reads or writes using our anonymous pipe.
320     ///
321     /// This is a wrapper around [`ReadFileEx`] or [`WriteFileEx`] that uses
322     /// [Asynchronous Procedure Call] (APC) to synchronize reads or writes.
323     ///
324     /// Note: This should not be used for handles we don't create.
325     ///
326     /// # Safety
327     ///
328     /// `buf` must be a pointer to a buffer that's valid for reads or writes
329     /// up to `len` bytes. The `AlertableIoFn` must be either `ReadFileEx` or `WriteFileEx`
330     ///
331     /// [`ReadFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfileex
332     /// [`WriteFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefileex
333     /// [Asynchronous Procedure Call]: https://docs.microsoft.com/en-us/windows/win32/sync/asynchronous-procedure-calls
334     unsafe fn alertable_io_internal(
335         &self,
336         io: AlertableIoFn,
337         buf: c::LPVOID,
338         len: c::DWORD,
339     ) -> io::Result<usize> {
340         // Use "alertable I/O" to synchronize the pipe I/O.
341         // This has four steps.
342         //
343         // STEP 1: Start the asynchronous I/O operation.
344         //         This simply calls either `ReadFileEx` or `WriteFileEx`,
345         //         giving it a pointer to the buffer and callback function.
346         //
347         // STEP 2: Enter an alertable state.
348         //         The callback set in step 1 will not be called until the thread
349         //         enters an "alertable" state. This can be done using `SleepEx`.
350         //
351         // STEP 3: The callback
352         //         Once the I/O is complete and the thread is in an alertable state,
353         //         the callback will be run on the same thread as the call to
354         //         `ReadFileEx` or `WriteFileEx` done in step 1.
355         //         In the callback we simply set the result of the async operation.
356         //
357         // STEP 4: Return the result.
358         //         At this point we'll have a result from the callback function
359         //         and can simply return it. Note that we must not return earlier,
360         //         while the I/O is still in progress.
361 
362         // The result that will be set from the asynchronous callback.
363         let mut async_result: Option<AsyncResult> = None;
364         struct AsyncResult {
365             error: u32,
366             transferred: u32,
367         }
368 
369         // STEP 3: The callback.
370         unsafe extern "system" fn callback(
371             dwErrorCode: u32,
372             dwNumberOfBytesTransferred: u32,
373             lpOverlapped: *mut c::OVERLAPPED,
374         ) {
375             // Set `async_result` using a pointer smuggled through `hEvent`.
376             let result = AsyncResult {
377                 error: dwErrorCode,
378                 transferred: dwNumberOfBytesTransferred,
379             };
380             *(*lpOverlapped).hEvent.cast::<Option<AsyncResult>>() = Some(result);
381         }
382 
383         // STEP 1: Start the I/O operation.
384         let mut overlapped: c::OVERLAPPED = crate::std::mem::zeroed();
385         // `hEvent` is unused by `ReadFileEx` and `WriteFileEx`.
386         // Therefore the documentation suggests using it to smuggle a pointer to the callback.
387         overlapped.hEvent = &mut async_result as *mut _ as *mut _;
388 
389         // Asynchronous read of the pipe.
390         // If successful, `callback` will be called once it completes.
391         let result = io(
392             self.inner.as_handle(),
393             buf,
394             len,
395             &mut overlapped,
396             Some(callback),
397         );
398         if result == c::FALSE {
399             // We can return here because the call failed.
400             // After this we must not return until the I/O completes.
401             return Err(io::Error::last_os_error());
402         }
403 
404         // Wait indefinitely for the result.
405         let result = loop {
406             // STEP 2: Enter an alertable state.
407             // The second parameter of `SleepEx` is used to make this sleep alertable.
408             c::SleepEx(c::INFINITE, c::TRUE);
409             if let Some(result) = async_result {
410                 break result;
411             }
412         };
413         // STEP 4: Return the result.
414         // `async_result` is always `Some` at this point
415         match result.error {
416             c::ERROR_SUCCESS => Ok(result.transferred as usize),
417             error => Err(io::Error::from_raw_os_error(error as _)),
418         }
419     }
420 }
421 
422 pub fn read2(p1: AnonPipe, v1: &mut Vec<u8>, p2: AnonPipe, v2: &mut Vec<u8>) -> io::Result<()> {
423     let p1 = p1.into_handle();
424     let p2 = p2.into_handle();
425 
426     let mut p1 = AsyncPipe::new(p1, v1)?;
427     let mut p2 = AsyncPipe::new(p2, v2)?;
428     let objs = [p1.event.as_raw_handle(), p2.event.as_raw_handle()];
429 
430     // In a loop we wait for either pipe's scheduled read operation to complete.
431     // If the operation completes with 0 bytes, that means EOF was reached, in
432     // which case we just finish out the other pipe entirely.
433     //
434     // Note that overlapped I/O is in general super unsafe because we have to
435     // be careful to ensure that all pointers in play are valid for the entire
436     // duration of the I/O operation (where tons of operations can also fail).
437     // The destructor for `AsyncPipe` ends up taking care of most of this.
438     loop {
439         let res = unsafe { c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) };
440         if res == c::WAIT_OBJECT_0 {
441             if !p1.result()? || !p1.schedule_read()? {
442                 return p2.finish();
443             }
444         } else if res == c::WAIT_OBJECT_0 + 1 {
445             if !p2.result()? || !p2.schedule_read()? {
446                 return p1.finish();
447             }
448         } else {
449             return Err(io::Error::last_os_error());
450         }
451     }
452 }
453 
454 struct AsyncPipe<'a> {
455     pipe: Handle,
456     event: Handle,
457     overlapped: Box<c::OVERLAPPED>, // needs a stable address
458     dst: &'a mut Vec<u8>,
459     state: State,
460 }
461 
462 #[derive(PartialEq, Debug)]
463 enum State {
464     NotReading,
465     Reading,
466     Read(usize),
467 }
468 
469 impl<'a> AsyncPipe<'a> {
470     fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> {
471         // Create an event which we'll use to coordinate our overlapped
472         // operations, this event will be used in WaitForMultipleObjects
473         // and passed as part of the OVERLAPPED handle.
474         //
475         // Note that we do a somewhat clever thing here by flagging the
476         // event as being manually reset and setting it initially to the
477         // signaled state. This means that we'll naturally fall through the
478         // WaitForMultipleObjects call above for pipes created initially,
479         // and the only time an even will go back to "unset" will be once an
480         // I/O operation is successfully scheduled (what we want).
481         let event = Handle::new_event(true, true)?;
482         let mut overlapped: Box<c::OVERLAPPED> = unsafe { Box::new(mem::zeroed()) };
483         overlapped.hEvent = event.as_raw_handle();
484         Ok(AsyncPipe {
485             pipe,
486             overlapped,
487             event,
488             dst,
489             state: State::NotReading,
490         })
491     }
492 
493     /// Executes an overlapped read operation.
494     ///
495     /// Must not currently be reading, and returns whether the pipe is currently
496     /// at EOF or not. If the pipe is not at EOF then `result()` must be called
497     /// to complete the read later on (may block), but if the pipe is at EOF
498     /// then `result()` should not be called as it will just block forever.
499     fn schedule_read(&mut self) -> io::Result<bool> {
500         assert_eq!(self.state, State::NotReading);
501         let amt = unsafe {
502             let slice = slice_to_end(self.dst);
503             self.pipe.read_overlapped(slice, &mut *self.overlapped)?
504         };
505 
506         // If this read finished immediately then our overlapped event will
507         // remain signaled (it was signaled coming in here) and we'll progress
508         // down to the method below.
509         //
510         // Otherwise the I/O operation is scheduled and the system set our event
511         // to not signaled, so we flag ourselves into the reading state and move
512         // on.
513         self.state = match amt {
514             Some(0) => return Ok(false),
515             Some(amt) => State::Read(amt),
516             None => State::Reading,
517         };
518         Ok(true)
519     }
520 
521     /// Wait for the result of the overlapped operation previously executed.
522     ///
523     /// Takes a parameter `wait` which indicates if this pipe is currently being
524     /// read whether the function should block waiting for the read to complete.
525     ///
526     /// Returns values:
527     ///
528     /// * `true` - finished any pending read and the pipe is not at EOF (keep
529     ///            going)
530     /// * `false` - finished any pending read and pipe is at EOF (stop issuing
531     ///             reads)
532     fn result(&mut self) -> io::Result<bool> {
533         let amt = match self.state {
534             State::NotReading => return Ok(true),
535             State::Reading => self.pipe.overlapped_result(&mut *self.overlapped, true)?,
536             State::Read(amt) => amt,
537         };
538         self.state = State::NotReading;
539         unsafe {
540             let len = self.dst.len();
541             self.dst.set_len(len + amt);
542         }
543         Ok(amt != 0)
544     }
545 
546     /// Finishes out reading this pipe entirely.
547     ///
548     /// Waits for any pending and schedule read, and then calls `read_to_end`
549     /// if necessary to read all the remaining information.
550     fn finish(&mut self) -> io::Result<()> {
551         while self.result()? && self.schedule_read()? {
552             // ...
553         }
554         Ok(())
555     }
556 }
557 
558 impl<'a> Drop for AsyncPipe<'a> {
559     fn drop(&mut self) {
560         match self.state {
561             State::Reading => {}
562             _ => return,
563         }
564 
565         // If we have a pending read operation, then we have to make sure that
566         // it's *done* before we actually drop this type. The kernel requires
567         // that the `OVERLAPPED` and buffer pointers are valid for the entire
568         // I/O operation.
569         //
570         // To do that, we call `CancelIo` to cancel any pending operation, and
571         // if that succeeds we wait for the overlapped result.
572         //
573         // If anything here fails, there's not really much we can do, so we leak
574         // the buffer/OVERLAPPED pointers to ensure we're at least memory safe.
575         if self.pipe.cancel_io().is_err() || self.result().is_err() {
576             let buf = mem::take(self.dst);
577             let overlapped = Box::new(unsafe { mem::zeroed() });
578             let overlapped = mem::replace(&mut self.overlapped, overlapped);
579             mem::forget((buf, overlapped));
580         }
581     }
582 }
583 
584 unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
585     if v.capacity() == 0 {
586         v.reserve(16);
587     }
588     if v.capacity() == v.len() {
589         v.reserve(1);
590     }
591     slice::from_raw_parts_mut(v.as_mut_ptr().add(v.len()), v.capacity() - v.len())
592 }
593