1 use crate::std::os::windows::prelude::*; 2 3 use crate::std::ffi::OsStr; 4 use crate::std::io::{self, BorrowedCursor, IoSlice, IoSliceMut, Read}; 5 use crate::std::mem; 6 use crate::std::path::Path; 7 use crate::std::ptr; 8 use crate::std::slice; 9 use crate::std::sync::atomic::AtomicUsize; 10 use crate::std::sync::atomic::Ordering::SeqCst; 11 use crate::std::sys::c; 12 use crate::std::sys::fs::{File, OpenOptions}; 13 use crate::std::sys::handle::Handle; 14 use crate::std::sys::hashmap_random_keys; 15 use crate::std::sys_common::IntoInner; 16 17 //////////////////////////////////////////////////////////////////////////////// 18 // Anonymous pipes 19 //////////////////////////////////////////////////////////////////////////////// 20 21 pub struct AnonPipe { 22 inner: Handle, 23 } 24 25 impl IntoInner<Handle> for AnonPipe { 26 fn into_inner(self) -> Handle { 27 self.inner 28 } 29 } 30 31 pub struct Pipes { 32 pub ours: AnonPipe, 33 pub theirs: AnonPipe, 34 } 35 36 /// Although this looks similar to `anon_pipe` in the Unix module it's actually 37 /// subtly different. Here we'll return two pipes in the `Pipes` return value, 38 /// but one is intended for "us" where as the other is intended for "someone 39 /// else". 40 /// 41 /// Currently the only use case for this function is pipes for stdio on 42 /// processes in the standard library, so "ours" is the one that'll stay in our 43 /// process whereas "theirs" will be inherited to a child. 44 /// 45 /// The ours/theirs pipes are *not* specifically readable or writable. Each 46 /// one only supports a read or a write, but which is which depends on the 47 /// boolean flag given. If `ours_readable` is `true`, then `ours` is readable and 48 /// `theirs` is writable. Conversely, if `ours_readable` is `false`, then `ours` 49 /// is writable and `theirs` is readable. 50 /// 51 /// Also note that the `ours` pipe is always a handle opened up in overlapped 52 /// mode. This means that technically speaking it should only ever be used 53 /// with `OVERLAPPED` instances, but also works out ok if it's only ever used 54 /// once at a time (which we do indeed guarantee). 55 pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes> { 56 // A 64kb pipe capacity is the same as a typical Linux default. 57 const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024; 58 59 // Note that we specifically do *not* use `CreatePipe` here because 60 // unfortunately the anonymous pipes returned do not support overlapped 61 // operations. Instead, we create a "hopefully unique" name and create a 62 // named pipe which has overlapped operations enabled. 63 // 64 // Once we do this, we connect do it as usual via `CreateFileW`, and then 65 // we return those reader/writer halves. Note that the `ours` pipe return 66 // value is always the named pipe, whereas `theirs` is just the normal file. 67 // This should hopefully shield us from child processes which assume their 68 // stdout is a named pipe, which would indeed be odd! 69 unsafe { 70 let ours; 71 let mut name; 72 let mut tries = 0; 73 let mut reject_remote_clients_flag = c::PIPE_REJECT_REMOTE_CLIENTS; 74 loop { 75 tries += 1; 76 name = format!( 77 r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}", 78 c::GetCurrentProcessId(), 79 random_number() 80 ); 81 let wide_name = OsStr::new(&name) 82 .encode_wide() 83 .chain(Some(0)) 84 .collect::<Vec<_>>(); 85 let mut flags = c::FILE_FLAG_FIRST_PIPE_INSTANCE | c::FILE_FLAG_OVERLAPPED; 86 if ours_readable { 87 flags |= c::PIPE_ACCESS_INBOUND; 88 } else { 89 flags |= c::PIPE_ACCESS_OUTBOUND; 90 } 91 92 let handle = c::CreateNamedPipeW( 93 wide_name.as_ptr(), 94 flags, 95 c::PIPE_TYPE_BYTE 96 | c::PIPE_READMODE_BYTE 97 | c::PIPE_WAIT 98 | reject_remote_clients_flag, 99 1, 100 PIPE_BUFFER_CAPACITY, 101 PIPE_BUFFER_CAPACITY, 102 0, 103 ptr::null_mut(), 104 ); 105 106 // We pass the `FILE_FLAG_FIRST_PIPE_INSTANCE` flag above, and we're 107 // also just doing a best effort at selecting a unique name. If 108 // `ERROR_ACCESS_DENIED` is returned then it could mean that we 109 // accidentally conflicted with an already existing pipe, so we try 110 // again. 111 // 112 // Don't try again too much though as this could also perhaps be a 113 // legit error. 114 // If `ERROR_INVALID_PARAMETER` is returned, this probably means we're 115 // running on pre-Vista version where `PIPE_REJECT_REMOTE_CLIENTS` is 116 // not supported, so we continue retrying without it. This implies 117 // reduced security on Windows versions older than Vista by allowing 118 // connections to this pipe from remote machines. 119 // Proper fix would increase the number of FFI imports and introduce 120 // significant amount of Windows XP specific code with no clean 121 // testing strategy 122 // For more info, see https://github.com/rust-lang/rust/pull/37677. 123 if handle == c::INVALID_HANDLE_VALUE { 124 let err = io::Error::last_os_error(); 125 let raw_os_err = err.raw_os_error(); 126 if tries < 10 { 127 if raw_os_err == Some(c::ERROR_ACCESS_DENIED as i32) { 128 continue; 129 } else if reject_remote_clients_flag != 0 130 && raw_os_err == Some(c::ERROR_INVALID_PARAMETER as i32) 131 { 132 reject_remote_clients_flag = 0; 133 tries -= 1; 134 continue; 135 } 136 } 137 return Err(err); 138 } 139 ours = Handle::from_raw_handle(handle); 140 break; 141 } 142 143 // Connect to the named pipe we just created. This handle is going to be 144 // returned in `theirs`, so if `ours` is readable we want this to be 145 // writable, otherwise if `ours` is writable we want this to be 146 // readable. 147 // 148 // Additionally we don't enable overlapped mode on this because most 149 // client processes aren't enabled to work with that. 150 let mut opts = OpenOptions::new(); 151 opts.write(ours_readable); 152 opts.read(!ours_readable); 153 opts.share_mode(0); 154 let size = mem::size_of::<c::SECURITY_ATTRIBUTES>(); 155 let mut sa = c::SECURITY_ATTRIBUTES { 156 nLength: size as c::DWORD, 157 lpSecurityDescriptor: ptr::null_mut(), 158 bInheritHandle: their_handle_inheritable as i32, 159 }; 160 opts.security_attributes(&mut sa); 161 let theirs = File::open(Path::new(&name), &opts)?; 162 let theirs = AnonPipe { 163 inner: theirs.into_inner(), 164 }; 165 166 Ok(Pipes { 167 ours: AnonPipe { inner: ours }, 168 theirs: AnonPipe { 169 inner: theirs.into_inner(), 170 }, 171 }) 172 } 173 } 174 175 /// Takes an asynchronous source pipe and returns a synchronous pipe suitable 176 /// for sending to a child process. 177 /// 178 /// This is achieved by creating a new set of pipes and spawning a thread that 179 /// relays messages between the source and the synchronous pipe. 180 pub fn spawn_pipe_relay( 181 source: &AnonPipe, 182 ours_readable: bool, 183 their_handle_inheritable: bool, 184 ) -> io::Result<AnonPipe> { 185 // We need this handle to live for the lifetime of the thread spawned below. 186 let source = source.duplicate()?; 187 188 // create a new pair of anon pipes. 189 let Pipes { theirs, ours } = anon_pipe(ours_readable, their_handle_inheritable)?; 190 191 // Spawn a thread that passes messages from one pipe to the other. 192 // Any errors will simply cause the thread to exit. 193 let (reader, writer) = if ours_readable { 194 (ours, source) 195 } else { 196 (source, ours) 197 }; 198 crate::std::thread::spawn(move || { 199 let mut buf = [0_u8; 4096]; 200 'reader: while let Ok(len) = reader.read(&mut buf) { 201 if len == 0 { 202 break; 203 } 204 let mut start = 0; 205 while let Ok(written) = writer.write(&buf[start..len]) { 206 start += written; 207 if start == len { 208 continue 'reader; 209 } 210 } 211 break; 212 } 213 }); 214 215 // Return the pipe that should be sent to the child process. 216 Ok(theirs) 217 } 218 219 fn random_number() -> usize { 220 static N: AtomicUsize = AtomicUsize::new(0); 221 loop { 222 if N.load(SeqCst) != 0 { 223 return N.fetch_add(1, SeqCst); 224 } 225 226 N.store(hashmap_random_keys().0 as usize, SeqCst); 227 } 228 } 229 230 // Abstracts over `ReadFileEx` and `WriteFileEx` 231 type AlertableIoFn = unsafe extern "system" fn( 232 BorrowedHandle<'_>, 233 c::LPVOID, 234 c::DWORD, 235 c::LPOVERLAPPED, 236 c::LPOVERLAPPED_COMPLETION_ROUTINE, 237 ) -> c::BOOL; 238 239 impl AnonPipe { 240 pub fn handle(&self) -> &Handle { 241 &self.inner 242 } 243 pub fn into_handle(self) -> Handle { 244 self.inner 245 } 246 fn duplicate(&self) -> io::Result<Self> { 247 self.inner 248 .duplicate(0, false, c::DUPLICATE_SAME_ACCESS) 249 .map(|inner| AnonPipe { inner }) 250 } 251 252 pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> { 253 let result = unsafe { 254 let len = crate::std::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD; 255 self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len) 256 }; 257 258 match result { 259 // The special treatment of BrokenPipe is to deal with Windows 260 // pipe semantics, which yields this error when *reading* from 261 // a pipe after the other end has closed; we interpret that as 262 // EOF on the pipe. 263 Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0), 264 _ => result, 265 } 266 } 267 268 pub fn read_buf(&self, mut buf: BorrowedCursor<'_>) -> io::Result<()> { 269 let result = unsafe { 270 let len = crate::std::cmp::min(buf.capacity(), c::DWORD::MAX as usize) as c::DWORD; 271 self.alertable_io_internal(c::ReadFileEx, buf.as_mut().as_mut_ptr() as _, len) 272 }; 273 274 match result { 275 // The special treatment of BrokenPipe is to deal with Windows 276 // pipe semantics, which yields this error when *reading* from 277 // a pipe after the other end has closed; we interpret that as 278 // EOF on the pipe. 279 Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(()), 280 Err(e) => Err(e), 281 Ok(n) => { 282 unsafe { 283 buf.advance(n); 284 } 285 Ok(()) 286 } 287 } 288 } 289 290 pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { 291 self.inner.read_vectored(bufs) 292 } 293 294 #[inline] 295 pub fn is_read_vectored(&self) -> bool { 296 self.inner.is_read_vectored() 297 } 298 299 pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> { 300 self.handle().read_to_end(buf) 301 } 302 303 pub fn write(&self, buf: &[u8]) -> io::Result<usize> { 304 unsafe { 305 let len = crate::std::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD; 306 self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len) 307 } 308 } 309 310 pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { 311 self.inner.write_vectored(bufs) 312 } 313 314 #[inline] 315 pub fn is_write_vectored(&self) -> bool { 316 self.inner.is_write_vectored() 317 } 318 319 /// Synchronizes asynchronous reads or writes using our anonymous pipe. 320 /// 321 /// This is a wrapper around [`ReadFileEx`] or [`WriteFileEx`] that uses 322 /// [Asynchronous Procedure Call] (APC) to synchronize reads or writes. 323 /// 324 /// Note: This should not be used for handles we don't create. 325 /// 326 /// # Safety 327 /// 328 /// `buf` must be a pointer to a buffer that's valid for reads or writes 329 /// up to `len` bytes. The `AlertableIoFn` must be either `ReadFileEx` or `WriteFileEx` 330 /// 331 /// [`ReadFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfileex 332 /// [`WriteFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefileex 333 /// [Asynchronous Procedure Call]: https://docs.microsoft.com/en-us/windows/win32/sync/asynchronous-procedure-calls 334 unsafe fn alertable_io_internal( 335 &self, 336 io: AlertableIoFn, 337 buf: c::LPVOID, 338 len: c::DWORD, 339 ) -> io::Result<usize> { 340 // Use "alertable I/O" to synchronize the pipe I/O. 341 // This has four steps. 342 // 343 // STEP 1: Start the asynchronous I/O operation. 344 // This simply calls either `ReadFileEx` or `WriteFileEx`, 345 // giving it a pointer to the buffer and callback function. 346 // 347 // STEP 2: Enter an alertable state. 348 // The callback set in step 1 will not be called until the thread 349 // enters an "alertable" state. This can be done using `SleepEx`. 350 // 351 // STEP 3: The callback 352 // Once the I/O is complete and the thread is in an alertable state, 353 // the callback will be run on the same thread as the call to 354 // `ReadFileEx` or `WriteFileEx` done in step 1. 355 // In the callback we simply set the result of the async operation. 356 // 357 // STEP 4: Return the result. 358 // At this point we'll have a result from the callback function 359 // and can simply return it. Note that we must not return earlier, 360 // while the I/O is still in progress. 361 362 // The result that will be set from the asynchronous callback. 363 let mut async_result: Option<AsyncResult> = None; 364 struct AsyncResult { 365 error: u32, 366 transferred: u32, 367 } 368 369 // STEP 3: The callback. 370 unsafe extern "system" fn callback( 371 dwErrorCode: u32, 372 dwNumberOfBytesTransferred: u32, 373 lpOverlapped: *mut c::OVERLAPPED, 374 ) { 375 // Set `async_result` using a pointer smuggled through `hEvent`. 376 let result = AsyncResult { 377 error: dwErrorCode, 378 transferred: dwNumberOfBytesTransferred, 379 }; 380 *(*lpOverlapped).hEvent.cast::<Option<AsyncResult>>() = Some(result); 381 } 382 383 // STEP 1: Start the I/O operation. 384 let mut overlapped: c::OVERLAPPED = crate::std::mem::zeroed(); 385 // `hEvent` is unused by `ReadFileEx` and `WriteFileEx`. 386 // Therefore the documentation suggests using it to smuggle a pointer to the callback. 387 overlapped.hEvent = &mut async_result as *mut _ as *mut _; 388 389 // Asynchronous read of the pipe. 390 // If successful, `callback` will be called once it completes. 391 let result = io( 392 self.inner.as_handle(), 393 buf, 394 len, 395 &mut overlapped, 396 Some(callback), 397 ); 398 if result == c::FALSE { 399 // We can return here because the call failed. 400 // After this we must not return until the I/O completes. 401 return Err(io::Error::last_os_error()); 402 } 403 404 // Wait indefinitely for the result. 405 let result = loop { 406 // STEP 2: Enter an alertable state. 407 // The second parameter of `SleepEx` is used to make this sleep alertable. 408 c::SleepEx(c::INFINITE, c::TRUE); 409 if let Some(result) = async_result { 410 break result; 411 } 412 }; 413 // STEP 4: Return the result. 414 // `async_result` is always `Some` at this point 415 match result.error { 416 c::ERROR_SUCCESS => Ok(result.transferred as usize), 417 error => Err(io::Error::from_raw_os_error(error as _)), 418 } 419 } 420 } 421 422 pub fn read2(p1: AnonPipe, v1: &mut Vec<u8>, p2: AnonPipe, v2: &mut Vec<u8>) -> io::Result<()> { 423 let p1 = p1.into_handle(); 424 let p2 = p2.into_handle(); 425 426 let mut p1 = AsyncPipe::new(p1, v1)?; 427 let mut p2 = AsyncPipe::new(p2, v2)?; 428 let objs = [p1.event.as_raw_handle(), p2.event.as_raw_handle()]; 429 430 // In a loop we wait for either pipe's scheduled read operation to complete. 431 // If the operation completes with 0 bytes, that means EOF was reached, in 432 // which case we just finish out the other pipe entirely. 433 // 434 // Note that overlapped I/O is in general super unsafe because we have to 435 // be careful to ensure that all pointers in play are valid for the entire 436 // duration of the I/O operation (where tons of operations can also fail). 437 // The destructor for `AsyncPipe` ends up taking care of most of this. 438 loop { 439 let res = unsafe { c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) }; 440 if res == c::WAIT_OBJECT_0 { 441 if !p1.result()? || !p1.schedule_read()? { 442 return p2.finish(); 443 } 444 } else if res == c::WAIT_OBJECT_0 + 1 { 445 if !p2.result()? || !p2.schedule_read()? { 446 return p1.finish(); 447 } 448 } else { 449 return Err(io::Error::last_os_error()); 450 } 451 } 452 } 453 454 struct AsyncPipe<'a> { 455 pipe: Handle, 456 event: Handle, 457 overlapped: Box<c::OVERLAPPED>, // needs a stable address 458 dst: &'a mut Vec<u8>, 459 state: State, 460 } 461 462 #[derive(PartialEq, Debug)] 463 enum State { 464 NotReading, 465 Reading, 466 Read(usize), 467 } 468 469 impl<'a> AsyncPipe<'a> { 470 fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> { 471 // Create an event which we'll use to coordinate our overlapped 472 // operations, this event will be used in WaitForMultipleObjects 473 // and passed as part of the OVERLAPPED handle. 474 // 475 // Note that we do a somewhat clever thing here by flagging the 476 // event as being manually reset and setting it initially to the 477 // signaled state. This means that we'll naturally fall through the 478 // WaitForMultipleObjects call above for pipes created initially, 479 // and the only time an even will go back to "unset" will be once an 480 // I/O operation is successfully scheduled (what we want). 481 let event = Handle::new_event(true, true)?; 482 let mut overlapped: Box<c::OVERLAPPED> = unsafe { Box::new(mem::zeroed()) }; 483 overlapped.hEvent = event.as_raw_handle(); 484 Ok(AsyncPipe { 485 pipe, 486 overlapped, 487 event, 488 dst, 489 state: State::NotReading, 490 }) 491 } 492 493 /// Executes an overlapped read operation. 494 /// 495 /// Must not currently be reading, and returns whether the pipe is currently 496 /// at EOF or not. If the pipe is not at EOF then `result()` must be called 497 /// to complete the read later on (may block), but if the pipe is at EOF 498 /// then `result()` should not be called as it will just block forever. 499 fn schedule_read(&mut self) -> io::Result<bool> { 500 assert_eq!(self.state, State::NotReading); 501 let amt = unsafe { 502 let slice = slice_to_end(self.dst); 503 self.pipe.read_overlapped(slice, &mut *self.overlapped)? 504 }; 505 506 // If this read finished immediately then our overlapped event will 507 // remain signaled (it was signaled coming in here) and we'll progress 508 // down to the method below. 509 // 510 // Otherwise the I/O operation is scheduled and the system set our event 511 // to not signaled, so we flag ourselves into the reading state and move 512 // on. 513 self.state = match amt { 514 Some(0) => return Ok(false), 515 Some(amt) => State::Read(amt), 516 None => State::Reading, 517 }; 518 Ok(true) 519 } 520 521 /// Wait for the result of the overlapped operation previously executed. 522 /// 523 /// Takes a parameter `wait` which indicates if this pipe is currently being 524 /// read whether the function should block waiting for the read to complete. 525 /// 526 /// Returns values: 527 /// 528 /// * `true` - finished any pending read and the pipe is not at EOF (keep 529 /// going) 530 /// * `false` - finished any pending read and pipe is at EOF (stop issuing 531 /// reads) 532 fn result(&mut self) -> io::Result<bool> { 533 let amt = match self.state { 534 State::NotReading => return Ok(true), 535 State::Reading => self.pipe.overlapped_result(&mut *self.overlapped, true)?, 536 State::Read(amt) => amt, 537 }; 538 self.state = State::NotReading; 539 unsafe { 540 let len = self.dst.len(); 541 self.dst.set_len(len + amt); 542 } 543 Ok(amt != 0) 544 } 545 546 /// Finishes out reading this pipe entirely. 547 /// 548 /// Waits for any pending and schedule read, and then calls `read_to_end` 549 /// if necessary to read all the remaining information. 550 fn finish(&mut self) -> io::Result<()> { 551 while self.result()? && self.schedule_read()? { 552 // ... 553 } 554 Ok(()) 555 } 556 } 557 558 impl<'a> Drop for AsyncPipe<'a> { 559 fn drop(&mut self) { 560 match self.state { 561 State::Reading => {} 562 _ => return, 563 } 564 565 // If we have a pending read operation, then we have to make sure that 566 // it's *done* before we actually drop this type. The kernel requires 567 // that the `OVERLAPPED` and buffer pointers are valid for the entire 568 // I/O operation. 569 // 570 // To do that, we call `CancelIo` to cancel any pending operation, and 571 // if that succeeds we wait for the overlapped result. 572 // 573 // If anything here fails, there's not really much we can do, so we leak 574 // the buffer/OVERLAPPED pointers to ensure we're at least memory safe. 575 if self.pipe.cancel_io().is_err() || self.result().is_err() { 576 let buf = mem::take(self.dst); 577 let overlapped = Box::new(unsafe { mem::zeroed() }); 578 let overlapped = mem::replace(&mut self.overlapped, overlapped); 579 mem::forget((buf, overlapped)); 580 } 581 } 582 } 583 584 unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] { 585 if v.capacity() == 0 { 586 v.reserve(16); 587 } 588 if v.capacity() == v.len() { 589 v.reserve(1); 590 } 591 slice::from_raw_parts_mut(v.as_mut_ptr().add(v.len()), v.capacity() - v.len()) 592 } 593