diff options
author | John Nunley <jtnunley01@gmail.com> | 2023-10-07 12:56:11 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-07 19:56:11 +0000 |
commit | c2f8abecfbaf6b6388e7746b733b7f22cbb7a750 (patch) | |
tree | 3b8e3cb638d25346f7147001ee57cffa46d52f80 /alacritty_terminal/src/tty/windows | |
parent | ace987f343649ae98e5fb63cf825414855ccd86e (diff) | |
download | alacritty-c2f8abecfbaf6b6388e7746b733b7f22cbb7a750.tar.gz alacritty-c2f8abecfbaf6b6388e7746b733b7f22cbb7a750.zip |
Port from mio to polling
This patch replaces the mio crate with the polling. Now that
smol-rs/polling#96 has been merged, we should be at full feature parity
with mio v0.6 now.
Fixes #7104.
Fixes #6486.
Diffstat (limited to 'alacritty_terminal/src/tty/windows')
-rw-r--r-- | alacritty_terminal/src/tty/windows/blocking.rs | 276 | ||||
-rw-r--r-- | alacritty_terminal/src/tty/windows/child.rs | 65 | ||||
-rw-r--r-- | alacritty_terminal/src/tty/windows/conpty.rs | 9 | ||||
-rw-r--r-- | alacritty_terminal/src/tty/windows/mod.rs | 112 |
4 files changed, 358 insertions, 104 deletions
diff --git a/alacritty_terminal/src/tty/windows/blocking.rs b/alacritty_terminal/src/tty/windows/blocking.rs new file mode 100644 index 00000000..3c74be4a --- /dev/null +++ b/alacritty_terminal/src/tty/windows/blocking.rs @@ -0,0 +1,276 @@ +//! Code for running a reader/writer on another thread while driving it through `polling`. + +use std::io::prelude::*; +use std::marker::PhantomData; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Wake, Waker}; +use std::{io, thread}; + +use piper::{pipe, Reader, Writer}; +use polling::os::iocp::{CompletionPacket, PollerIocpExt}; +use polling::{Event, PollMode, Poller}; + +use crate::thread::spawn_named; + +struct Registration { + interest: Mutex<Option<Interest>>, + end: PipeEnd, +} + +#[derive(Copy, Clone)] +enum PipeEnd { + Reader, + Writer, +} + +struct Interest { + /// The event to send about completion. + event: Event, + + /// The poller to send the event to. + poller: Arc<Poller>, + + /// The mode that we are in. + mode: PollMode, +} + +/// Poll a reader in another thread. +pub struct UnblockedReader<R> { + /// The event to send about completion. + interest: Arc<Registration>, + + /// The pipe that we are reading from. + pipe: Reader, + + /// Is this the first time registering? + first_register: bool, + + /// We logically own the reader, but we don't actually use it. + _reader: PhantomData<R>, +} + +impl<R: Read + Send + 'static> UnblockedReader<R> { + /// Spawn a new unblocked reader. + pub fn new(mut source: R, pipe_capacity: usize) -> Self { + // Create a new pipe. + let (reader, mut writer) = pipe(pipe_capacity); + let interest = Arc::new(Registration { + interest: Mutex::<Option<Interest>>::new(None), + end: PipeEnd::Reader, + }); + + // Spawn the reader thread. + spawn_named("alacritty-tty-reader-thread", move || { + let waker = Waker::from(Arc::new(ThreadWaker(thread::current()))); + let mut context = Context::from_waker(&waker); + + loop { + // Read from the reader into the pipe. + match writer.poll_fill(&mut context, &mut source) { + Poll::Ready(Ok(0)) => { + // Either the pipe is closed or the reader is at its EOF. + // In any case, we are done. + return; + }, + + Poll::Ready(Ok(_)) => { + // Keep reading. + continue; + }, + + Poll::Ready(Err(e)) if e.kind() == io::ErrorKind::Interrupted => { + // We were interrupted; continue. + continue; + }, + + Poll::Ready(Err(e)) => { + log::error!("error writing to pipe: {}", e); + return; + }, + + Poll::Pending => { + // We are now waiting on the other end to advance. Park the + // thread until they do. + thread::park(); + }, + } + } + }); + + Self { interest, pipe: reader, first_register: true, _reader: PhantomData } + } + + /// Register interest in the reader. + pub fn register(&mut self, poller: &Arc<Poller>, event: Event, mode: PollMode) { + let mut interest = self.interest.interest.lock().unwrap(); + *interest = Some(Interest { event, poller: poller.clone(), mode }); + + // Send the event to start off with if we have any data. + if (!self.pipe.is_empty() && event.readable) || self.first_register { + self.first_register = false; + poller.post(CompletionPacket::new(event)).ok(); + } + } + + /// Deregister interest in the reader. + pub fn deregister(&self) { + let mut interest = self.interest.interest.lock().unwrap(); + *interest = None; + } + + /// Try to read from the reader. + pub fn try_read(&mut self, buf: &mut [u8]) -> usize { + let waker = Waker::from(self.interest.clone()); + + match self.pipe.poll_drain_bytes(&mut Context::from_waker(&waker), buf) { + Poll::Pending => 0, + Poll::Ready(n) => n, + } + } +} + +impl<R: Read + Send + 'static> Read for UnblockedReader<R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + Ok(self.try_read(buf)) + } +} + +/// Poll a writer in another thread. +pub struct UnblockedWriter<W> { + /// The interest to send about completion. + interest: Arc<Registration>, + + /// The pipe that we are writing to. + pipe: Writer, + + /// We logically own the writer, but we don't actually use it. + _reader: PhantomData<W>, +} + +impl<W: Write + Send + 'static> UnblockedWriter<W> { + /// Spawn a new unblocked writer. + pub fn new(mut sink: W, pipe_capacity: usize) -> Self { + // Create a new pipe. + let (mut reader, writer) = pipe(pipe_capacity); + let interest = Arc::new(Registration { + interest: Mutex::<Option<Interest>>::new(None), + end: PipeEnd::Writer, + }); + + // Spawn the writer thread. + spawn_named("alacritty-tty-writer-thread", move || { + let waker = Waker::from(Arc::new(ThreadWaker(thread::current()))); + let mut context = Context::from_waker(&waker); + + loop { + // Write from the pipe into the writer. + match reader.poll_drain(&mut context, &mut sink) { + Poll::Ready(Ok(0)) => { + // Either the pipe is closed or the writer is full. + // In any case, we are done. + return; + }, + + Poll::Ready(Ok(_)) => { + // Keep writing. + continue; + }, + + Poll::Ready(Err(e)) if e.kind() == io::ErrorKind::Interrupted => { + // We were interrupted; continue. + continue; + }, + + Poll::Ready(Err(e)) => { + log::error!("error writing to pipe: {}", e); + return; + }, + + Poll::Pending => { + // We are now waiting on the other end to advance. Park the + // thread until they do. + thread::park(); + }, + } + } + }); + + Self { interest, pipe: writer, _reader: PhantomData } + } + + /// Register interest in the writer. + pub fn register(&self, poller: &Arc<Poller>, event: Event, mode: PollMode) { + let mut interest = self.interest.interest.lock().unwrap(); + *interest = Some(Interest { event, poller: poller.clone(), mode }); + + // Send the event to start off with if we have room for data. + if !self.pipe.is_full() && event.writable { + poller.post(CompletionPacket::new(event)).ok(); + } + } + + /// Deregister interest in the writer. + pub fn deregister(&self) { + let mut interest = self.interest.interest.lock().unwrap(); + *interest = None; + } + + /// Try to write to the writer. + pub fn try_write(&mut self, buf: &[u8]) -> usize { + let waker = Waker::from(self.interest.clone()); + + match self.pipe.poll_fill_bytes(&mut Context::from_waker(&waker), buf) { + Poll::Pending => 0, + Poll::Ready(n) => n, + } + } +} + +impl<W: Write + Send + 'static> Write for UnblockedWriter<W> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + Ok(self.try_write(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + // Nothing to flush. + Ok(()) + } +} + +struct ThreadWaker(thread::Thread); + +impl Wake for ThreadWaker { + fn wake(self: Arc<Self>) { + self.0.unpark(); + } + + fn wake_by_ref(self: &Arc<Self>) { + self.0.unpark(); + } +} + +impl Wake for Registration { + fn wake(self: Arc<Self>) { + self.wake_by_ref(); + } + + fn wake_by_ref(self: &Arc<Self>) { + let mut interest_lock = self.interest.lock().unwrap(); + if let Some(interest) = interest_lock.as_ref() { + // Send the event to the poller. + let send_event = match self.end { + PipeEnd::Reader => interest.event.readable, + PipeEnd::Writer => interest.event.writable, + }; + + if send_event { + interest.poller.post(CompletionPacket::new(interest.event)).ok(); + + // Clear the event if we're in oneshot mode. + if matches!(interest.mode, PollMode::Oneshot | PollMode::EdgeOneshot) { + *interest_lock = None; + } + } + } + } +} diff --git a/alacritty_terminal/src/tty/windows/child.rs b/alacritty_terminal/src/tty/windows/child.rs index 19c8a195..6bc9ed20 100644 --- a/alacritty_terminal/src/tty/windows/child.rs +++ b/alacritty_terminal/src/tty/windows/child.rs @@ -1,8 +1,10 @@ use std::ffi::c_void; use std::io::Error; use std::sync::atomic::{AtomicPtr, Ordering}; +use std::sync::{mpsc, Arc, Mutex}; -use mio_extras::channel::{channel, Receiver, Sender}; +use polling::os::iocp::{CompletionPacket, PollerIocpExt}; +use polling::{Event, Poller}; use windows_sys::Win32::Foundation::{BOOLEAN, HANDLE}; use windows_sys::Win32::System::Threading::{ @@ -12,27 +14,43 @@ use windows_sys::Win32::System::Threading::{ use crate::tty::ChildEvent; +struct Interest { + poller: Arc<Poller>, + event: Event, +} + +struct ChildExitSender { + sender: mpsc::Sender<ChildEvent>, + interest: Arc<Mutex<Option<Interest>>>, +} + /// WinAPI callback to run when child process exits. extern "system" fn child_exit_callback(ctx: *mut c_void, timed_out: BOOLEAN) { if timed_out != 0 { return; } - let event_tx: Box<_> = unsafe { Box::from_raw(ctx as *mut Sender<ChildEvent>) }; - let _ = event_tx.send(ChildEvent::Exited); + let event_tx: Box<_> = unsafe { Box::from_raw(ctx as *mut ChildExitSender) }; + let _ = event_tx.sender.send(ChildEvent::Exited); + let interest = event_tx.interest.lock().unwrap(); + if let Some(interest) = interest.as_ref() { + interest.poller.post(CompletionPacket::new(interest.event)).ok(); + } } pub struct ChildExitWatcher { wait_handle: AtomicPtr<c_void>, - event_rx: Receiver<ChildEvent>, + event_rx: mpsc::Receiver<ChildEvent>, + interest: Arc<Mutex<Option<Interest>>>, } impl ChildExitWatcher { pub fn new(child_handle: HANDLE) -> Result<ChildExitWatcher, Error> { - let (event_tx, event_rx) = channel::<ChildEvent>(); + let (event_tx, event_rx) = mpsc::channel(); let mut wait_handle: HANDLE = 0; - let sender_ref = Box::new(event_tx); + let interest = Arc::new(Mutex::new(None)); + let sender_ref = Box::new(ChildExitSender { sender: event_tx, interest: interest.clone() }); let success = unsafe { RegisterWaitForSingleObject( @@ -51,13 +69,22 @@ impl ChildExitWatcher { Ok(ChildExitWatcher { wait_handle: AtomicPtr::from(wait_handle as *mut c_void), event_rx, + interest, }) } } - pub fn event_rx(&self) -> &Receiver<ChildEvent> { + pub fn event_rx(&self) -> &mpsc::Receiver<ChildEvent> { &self.event_rx } + + pub fn register(&self, poller: &Arc<Poller>, event: Event) { + *self.interest.lock().unwrap() = Some(Interest { poller: poller.clone(), event }); + } + + pub fn deregister(&self) { + *self.interest.lock().unwrap() = None; + } } impl Drop for ChildExitWatcher { @@ -72,36 +99,28 @@ impl Drop for ChildExitWatcher { mod tests { use std::os::windows::io::AsRawHandle; use std::process::Command; + use std::sync::Arc; use std::time::Duration; - use mio::{Events, Poll, PollOpt, Ready, Token}; - + use super::super::PTY_CHILD_EVENT_TOKEN; use super::*; #[test] pub fn event_is_emitted_when_child_exits() { const WAIT_TIMEOUT: Duration = Duration::from_millis(200); + let poller = Arc::new(Poller::new().unwrap()); + let mut child = Command::new("cmd.exe").spawn().unwrap(); let child_exit_watcher = ChildExitWatcher::new(child.as_raw_handle() as HANDLE).unwrap(); - - let mut events = Events::with_capacity(1); - let poll = Poll::new().unwrap(); - let child_events_token = Token::from(0usize); - - poll.register( - child_exit_watcher.event_rx(), - child_events_token, - Ready::readable(), - PollOpt::oneshot(), - ) - .unwrap(); + child_exit_watcher.register(&poller, Event::readable(PTY_CHILD_EVENT_TOKEN)); child.kill().unwrap(); // Poll for the event or fail with timeout if nothing has been sent. - poll.poll(&mut events, Some(WAIT_TIMEOUT)).unwrap(); - assert_eq!(events.iter().next().unwrap().token(), child_events_token); + let mut events = polling::Events::new(); + poller.wait(&mut events, Some(WAIT_TIMEOUT)).unwrap(); + assert_eq!(events.iter().next().unwrap().key, PTY_CHILD_EVENT_TOKEN); // Verify that at least one `ChildEvent::Exited` was received. assert_eq!(child_exit_watcher.event_rx().try_recv(), Ok(ChildEvent::Exited)); } diff --git a/alacritty_terminal/src/tty/windows/conpty.rs b/alacritty_terminal/src/tty/windows/conpty.rs index c9ed631e..12189371 100644 --- a/alacritty_terminal/src/tty/windows/conpty.rs +++ b/alacritty_terminal/src/tty/windows/conpty.rs @@ -3,8 +3,6 @@ use std::io::Error; use std::os::windows::io::IntoRawHandle; use std::{mem, ptr}; -use mio_anonymous_pipes::{EventedAnonRead, EventedAnonWrite}; - use windows_sys::core::{HRESULT, PWSTR}; use windows_sys::Win32::Foundation::{HANDLE, S_OK}; use windows_sys::Win32::System::Console::{ @@ -21,9 +19,12 @@ use windows_sys::Win32::System::Threading::{ use crate::config::PtyConfig; use crate::event::{OnResize, WindowSize}; +use crate::tty::windows::blocking::{UnblockedReader, UnblockedWriter}; use crate::tty::windows::child::ChildExitWatcher; use crate::tty::windows::{cmdline, win32_string, Pty}; +const PIPE_CAPACITY: usize = crate::event_loop::READ_BUFFER_SIZE; + /// Load the pseudoconsole API from conpty.dll if possible, otherwise use the /// standard Windows API. /// @@ -220,8 +221,8 @@ pub fn new(config: &PtyConfig, window_size: WindowSize) -> Option<Pty> { } } - let conin = EventedAnonWrite::new(conin); - let conout = EventedAnonRead::new(conout); + let conin = UnblockedWriter::new(conin, PIPE_CAPACITY); + let conout = UnblockedReader::new(conout, PIPE_CAPACITY); let child_watcher = ChildExitWatcher::new(proc_info.hProcess).unwrap(); let conpty = Conpty { handle: pty_handle as HPCON, api }; diff --git a/alacritty_terminal/src/tty/windows/mod.rs b/alacritty_terminal/src/tty/windows/mod.rs index 57925f4c..080f6e83 100644 --- a/alacritty_terminal/src/tty/windows/mod.rs +++ b/alacritty_terminal/src/tty/windows/mod.rs @@ -3,17 +3,27 @@ use std::io::{self, Error, ErrorKind, Result}; use std::iter::once; use std::os::windows::ffi::OsStrExt; use std::sync::mpsc::TryRecvError; +use std::sync::Arc; use crate::config::{Program, PtyConfig}; use crate::event::{OnResize, WindowSize}; use crate::tty::windows::child::ChildExitWatcher; use crate::tty::{ChildEvent, EventedPty, EventedReadWrite}; +mod blocking; mod child; mod conpty; +use blocking::{UnblockedReader, UnblockedWriter}; use conpty::Conpty as Backend; -use mio_anonymous_pipes::{EventedAnonRead as ReadPipe, EventedAnonWrite as WritePipe}; +use miow::pipe::{AnonRead, AnonWrite}; +use polling::{Event, Poller}; + +pub const PTY_CHILD_EVENT_TOKEN: usize = 1; +pub const PTY_READ_WRITE_TOKEN: usize = 2; + +type ReadPipe = UnblockedReader<AnonRead>; +type WritePipe = UnblockedWriter<AnonWrite>; pub struct Pty { // XXX: Backend is required to be the first field, to ensure correct drop order. Dropping @@ -21,9 +31,6 @@ pub struct Pty { backend: Backend, conout: ReadPipe, conin: WritePipe, - read_token: mio::Token, - write_token: mio::Token, - child_event_token: mio::Token, child_watcher: ChildExitWatcher, } @@ -39,51 +46,29 @@ impl Pty { conin: impl Into<WritePipe>, child_watcher: ChildExitWatcher, ) -> Self { - Self { - backend: backend.into(), - conout: conout.into(), - conin: conin.into(), - read_token: 0.into(), - write_token: 0.into(), - child_event_token: 0.into(), - child_watcher, - } + Self { backend: backend.into(), conout: conout.into(), conin: conin.into(), child_watcher } } } +fn with_key(mut event: Event, key: usize) -> Event { + event.key = key; + event +} + impl EventedReadWrite for Pty { type Reader = ReadPipe; type Writer = WritePipe; #[inline] - fn register( + unsafe fn register( &mut self, - poll: &mio::Poll, - token: &mut dyn Iterator<Item = mio::Token>, - interest: mio::Ready, - poll_opts: mio::PollOpt, + poll: &Arc<Poller>, + interest: polling::Event, + poll_opts: polling::PollMode, ) -> io::Result<()> { - self.read_token = token.next().unwrap(); - self.write_token = token.next().unwrap(); - - if interest.is_readable() { - poll.register(&self.conout, self.read_token, mio::Ready::readable(), poll_opts)? - } else { - poll.register(&self.conout, self.read_token, mio::Ready::empty(), poll_opts)? - } - if interest.is_writable() { - poll.register(&self.conin, self.write_token, mio::Ready::writable(), poll_opts)? - } else { - poll.register(&self.conin, self.write_token, mio::Ready::empty(), poll_opts)? - } - - self.child_event_token = token.next().unwrap(); - poll.register( - self.child_watcher.event_rx(), - self.child_event_token, - mio::Ready::readable(), - poll_opts, - )?; + self.conin.register(poll, with_key(interest, PTY_READ_WRITE_TOKEN), poll_opts); + self.conout.register(poll, with_key(interest, PTY_READ_WRITE_TOKEN), poll_opts); + self.child_watcher.register(poll, with_key(interest, PTY_CHILD_EVENT_TOKEN)); Ok(()) } @@ -91,36 +76,23 @@ impl EventedReadWrite for Pty { #[inline] fn reregister( &mut self, - poll: &mio::Poll, - interest: mio::Ready, - poll_opts: mio::PollOpt, + poll: &Arc<Poller>, + interest: polling::Event, + poll_opts: polling::PollMode, ) -> io::Result<()> { - if interest.is_readable() { - poll.reregister(&self.conout, self.read_token, mio::Ready::readable(), poll_opts)?; - } else { - poll.reregister(&self.conout, self.read_token, mio::Ready::empty(), poll_opts)?; - } - if interest.is_writable() { - poll.reregister(&self.conin, self.write_token, mio::Ready::writable(), poll_opts)?; - } else { - poll.reregister(&self.conin, self.write_token, mio::Ready::empty(), poll_opts)?; - } - - poll.reregister( - self.child_watcher.event_rx(), - self.child_event_token, - mio::Ready::readable(), - poll_opts, - )?; + self.conin.register(poll, with_key(interest, PTY_READ_WRITE_TOKEN), poll_opts); + self.conout.register(poll, with_key(interest, PTY_READ_WRITE_TOKEN), poll_opts); + self.child_watcher.register(poll, with_key(interest, PTY_CHILD_EVENT_TOKEN)); Ok(()) } #[inline] - fn deregister(&mut self, poll: &mio::Poll) -> io::Result<()> { - poll.deregister(&self.conout)?; - poll.deregister(&self.conin)?; - poll.deregister(self.child_watcher.event_rx())?; + fn deregister(&mut self, _poll: &Arc<Poller>) -> io::Result<()> { + self.conin.deregister(); + self.conout.deregister(); + self.child_watcher.deregister(); + Ok(()) } @@ -130,26 +102,12 @@ impl EventedReadWrite for Pty { } #[inline] - fn read_token(&self) -> mio::Token { - self.read_token - } - - #[inline] fn writer(&mut self) -> &mut Self::Writer { &mut self.conin } - - #[inline] - fn write_token(&self) -> mio::Token { - self.write_token - } } impl EventedPty for Pty { - fn child_event_token(&self) -> mio::Token { - self.child_event_token - } - fn next_child_event(&mut self) -> Option<ChildEvent> { match self.child_watcher.event_rx().try_recv() { Ok(ev) => Some(ev), |