diff options
author | John Nunley <jtnunley01@gmail.com> | 2023-10-07 12:56:11 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-07 19:56:11 +0000 |
commit | c2f8abecfbaf6b6388e7746b733b7f22cbb7a750 (patch) | |
tree | 3b8e3cb638d25346f7147001ee57cffa46d52f80 /alacritty_terminal | |
parent | ace987f343649ae98e5fb63cf825414855ccd86e (diff) | |
download | alacritty-c2f8abecfbaf6b6388e7746b733b7f22cbb7a750.tar.gz alacritty-c2f8abecfbaf6b6388e7746b733b7f22cbb7a750.zip |
Port from mio to polling
This patch replaces the mio crate with the polling. Now that
smol-rs/polling#96 has been merged, we should be at full feature parity
with mio v0.6 now.
Fixes #7104.
Fixes #6486.
Diffstat (limited to 'alacritty_terminal')
-rw-r--r-- | alacritty_terminal/Cargo.toml | 10 | ||||
-rw-r--r-- | alacritty_terminal/src/event_loop.rs | 323 | ||||
-rw-r--r-- | alacritty_terminal/src/tty/mod.rs | 22 | ||||
-rw-r--r-- | alacritty_terminal/src/tty/unix.rs | 129 | ||||
-rw-r--r-- | alacritty_terminal/src/tty/windows/blocking.rs | 276 | ||||
-rw-r--r-- | alacritty_terminal/src/tty/windows/child.rs | 65 | ||||
-rw-r--r-- | alacritty_terminal/src/tty/windows/conpty.rs | 9 | ||||
-rw-r--r-- | alacritty_terminal/src/tty/windows/mod.rs | 112 |
8 files changed, 606 insertions, 340 deletions
diff --git a/alacritty_terminal/Cargo.toml b/alacritty_terminal/Cargo.toml index dc164413..7498afab 100644 --- a/alacritty_terminal/Cargo.toml +++ b/alacritty_terminal/Cargo.toml @@ -23,9 +23,8 @@ bitflags = { version = "2.2.1", features = ["serde"] } home = "0.5.5" libc = "0.2" log = "0.4" -mio = "0.6.20" -mio-extras = "2" parking_lot = "0.12.0" +polling = "3.0.0" regex-automata = "0.3.6" serde = { version = "1", features = ["derive", "rc"] } serde_yaml = "0.8" @@ -36,12 +35,11 @@ vte = { version = "0.12.0", default-features = false, features = ["ansi", "serde [target.'cfg(unix)'.dependencies] nix = { version = "0.26.2", default-features = false, features = ["term"] } signal-hook = "0.3.10" -signal-hook-mio = { version = "0.2.1", features = ["support-v0_6"] } [target.'cfg(windows)'.dependencies] -mio-anonymous-pipes = "0.2" -miow = "0.3" -windows-sys = { version = "0.48", features = [ +piper = "0.2.1" +miow = "0.3.0" +windows-sys = { version = "0.48.0", features = [ "Win32_System_Console", "Win32_Foundation", "Win32_Security", diff --git a/alacritty_terminal/src/event_loop.rs b/alacritty_terminal/src/event_loop.rs index 61dc69bc..de22e49d 100644 --- a/alacritty_terminal/src/event_loop.rs +++ b/alacritty_terminal/src/event_loop.rs @@ -5,15 +5,14 @@ use std::collections::VecDeque; use std::fs::File; use std::io::{self, ErrorKind, Read, Write}; use std::marker::Send; +use std::num::NonZeroUsize; +use std::sync::mpsc::{self, Receiver, Sender, TryRecvError}; use std::sync::Arc; use std::thread::JoinHandle; use std::time::Instant; use log::error; -#[cfg(not(windows))] -use mio::unix::UnixReady; -use mio::{self, Events, PollOpt, Ready}; -use mio_extras::channel::{self, Receiver, Sender}; +use polling::{Event as PollingEvent, Events, PollMode}; use crate::event::{self, Event, EventListener, WindowSize}; use crate::sync::FairMutex; @@ -21,7 +20,7 @@ use crate::term::Term; use crate::{ansi, thread, tty}; /// Max bytes to read from the PTY before forced terminal synchronization. -const READ_BUFFER_SIZE: usize = 0x10_0000; +pub(crate) const READ_BUFFER_SIZE: usize = 0x10_0000; /// Max bytes to read from the PTY while the terminal is locked. const MAX_LOCKED_READ: usize = u16::MAX as usize; @@ -39,14 +38,14 @@ pub enum Msg { Resize(WindowSize), } -/// The main event!.. loop. +/// The main event loop. /// /// Handles all the PTY I/O and runs the PTY parser which updates terminal /// state. pub struct EventLoop<T: tty::EventedPty, U: EventListener> { - poll: mio::Poll, + poll: Arc<polling::Poller>, pty: T, - rx: Receiver<Msg>, + rx: PeekableReceiver<Msg>, tx: Sender<Msg>, terminal: Arc<FairMutex<Term<U>>>, event_proxy: U, @@ -54,97 +53,6 @@ pub struct EventLoop<T: tty::EventedPty, U: EventListener> { ref_test: bool, } -/// Helper type which tracks how much of a buffer has been written. -struct Writing { - source: Cow<'static, [u8]>, - written: usize, -} - -pub struct Notifier(pub Sender<Msg>); - -impl event::Notify for Notifier { - fn notify<B>(&self, bytes: B) - where - B: Into<Cow<'static, [u8]>>, - { - let bytes = bytes.into(); - // terminal hangs if we send 0 bytes through. - if bytes.len() == 0 { - return; - } - - let _ = self.0.send(Msg::Input(bytes)); - } -} - -impl event::OnResize for Notifier { - fn on_resize(&mut self, window_size: WindowSize) { - let _ = self.0.send(Msg::Resize(window_size)); - } -} - -/// All of the mutable state needed to run the event loop. -/// -/// Contains list of items to write, current write state, etc. Anything that -/// would otherwise be mutated on the `EventLoop` goes here. -#[derive(Default)] -pub struct State { - write_list: VecDeque<Cow<'static, [u8]>>, - writing: Option<Writing>, - parser: ansi::Processor, -} - -impl State { - #[inline] - fn ensure_next(&mut self) { - if self.writing.is_none() { - self.goto_next(); - } - } - - #[inline] - fn goto_next(&mut self) { - self.writing = self.write_list.pop_front().map(Writing::new); - } - - #[inline] - fn take_current(&mut self) -> Option<Writing> { - self.writing.take() - } - - #[inline] - fn needs_write(&self) -> bool { - self.writing.is_some() || !self.write_list.is_empty() - } - - #[inline] - fn set_current(&mut self, new: Option<Writing>) { - self.writing = new; - } -} - -impl Writing { - #[inline] - fn new(c: Cow<'static, [u8]>) -> Writing { - Writing { source: c, written: 0 } - } - - #[inline] - fn advance(&mut self, n: usize) { - self.written += n; - } - - #[inline] - fn remaining_bytes(&self) -> &[u8] { - &self.source[self.written..] - } - - #[inline] - fn finished(&self) -> bool { - self.written >= self.source.len() - } -} - impl<T, U> EventLoop<T, U> where T: tty::EventedPty + event::OnResize + Send + 'static, @@ -158,12 +66,12 @@ where hold: bool, ref_test: bool, ) -> EventLoop<T, U> { - let (tx, rx) = channel::channel(); + let (tx, rx) = mpsc::channel(); EventLoop { - poll: mio::Poll::new().expect("create mio Poll"), + poll: polling::Poller::new().expect("create Poll").into(), pty, tx, - rx, + rx: PeekableReceiver::new(rx), terminal, event_proxy, hold, @@ -171,15 +79,15 @@ where } } - pub fn channel(&self) -> Sender<Msg> { - self.tx.clone() + pub fn channel(&self) -> EventLoopSender { + EventLoopSender { sender: self.tx.clone(), poller: self.poll.clone() } } /// Drain the channel. /// /// Returns `false` when a shutdown message was received. fn drain_recv_channel(&mut self, state: &mut State) -> bool { - while let Ok(msg) = self.rx.try_recv() { + while let Some(msg) = self.rx.recv() { match msg { Msg::Input(input) => state.write_list.push_back(input), Msg::Resize(window_size) => self.pty.on_resize(window_size), @@ -190,20 +98,6 @@ where true } - /// Returns a `bool` indicating whether or not the event loop should continue running. - #[inline] - fn channel_event(&mut self, token: mio::Token, state: &mut State) -> bool { - if !self.drain_recv_channel(state) { - return false; - } - - self.poll - .reregister(&self.rx, token, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()) - .unwrap(); - - true - } - #[inline] fn pty_read<X>( &mut self, @@ -313,17 +207,15 @@ where let mut state = State::default(); let mut buf = [0u8; READ_BUFFER_SIZE]; - let mut tokens = (0..).map(Into::into); - - let poll_opts = PollOpt::edge() | PollOpt::oneshot(); - - let channel_token = tokens.next().unwrap(); - self.poll.register(&self.rx, channel_token, Ready::readable(), poll_opts).unwrap(); + let poll_opts = PollMode::Level; + let mut interest = PollingEvent::readable(0); // Register TTY through EventedRW interface. - self.pty.register(&self.poll, &mut tokens, Ready::readable(), poll_opts).unwrap(); + unsafe { + self.pty.register(&self.poll, interest, poll_opts).unwrap(); + } - let mut events = Events::with_capacity(1024); + let mut events = Events::with_capacity(NonZeroUsize::new(1024).unwrap()); let mut pipe = if self.ref_test { Some(File::create("./alacritty.recording").expect("create alacritty recording")) @@ -337,7 +229,8 @@ where let timeout = handler.sync_timeout().map(|st| st.saturating_duration_since(Instant::now())); - if let Err(err) = self.poll.poll(&mut events, timeout) { + events.clear(); + if let Err(err) = self.poll.wait(&mut events, timeout) { match err.kind() { ErrorKind::Interrupted => continue, _ => panic!("EventLoop polling error: {err:?}"), @@ -345,21 +238,20 @@ where } // Handle synchronized update timeout. - if events.is_empty() { + if events.is_empty() && self.rx.peek().is_none() { state.parser.stop_sync(&mut *self.terminal.lock()); self.event_proxy.send_event(Event::Wakeup); continue; } - for event in events.iter() { - match event.token() { - token if token == channel_token => { - if !self.channel_event(channel_token, &mut state) { - break 'event_loop; - } - }, + // Handle channel events, if there are any. + if !self.drain_recv_channel(&mut state) { + break; + } - token if token == self.pty.child_event_token() => { + for event in events.iter() { + match event.key { + tty::PTY_CHILD_EVENT_TOKEN => { if let Some(tty::ChildEvent::Exited) = self.pty.next_child_event() { if self.hold { // With hold enabled, make sure the PTY is drained. @@ -374,17 +266,13 @@ where } }, - token - if token == self.pty.read_token() - || token == self.pty.write_token() => - { - #[cfg(unix)] - if UnixReady::from(event.readiness()).is_hup() { + tty::PTY_READ_WRITE_TOKEN => { + if event.is_interrupt() { // Don't try to do I/O on a dead PTY. continue; } - if event.readiness().is_readable() { + if event.readable { if let Err(err) = self.pty_read(&mut state, &mut buf, pipe.as_mut()) { // On Linux, a `read` on the master side of a PTY can fail @@ -402,7 +290,7 @@ where } } - if event.readiness().is_writable() { + if event.writable { if let Err(err) = self.pty_write(&mut state) { error!("Error writing to PTY in event loop: {}", err); break 'event_loop; @@ -414,19 +302,152 @@ where } // Register write interest if necessary. - let mut interest = Ready::readable(); - if state.needs_write() { - interest.insert(Ready::writable()); + let needs_write = state.needs_write(); + if needs_write != interest.writable { + interest.writable = needs_write; + + // Re-register with new interest. + self.pty.reregister(&self.poll, interest, poll_opts).unwrap(); } - // Reregister with new interest. - self.pty.reregister(&self.poll, interest, poll_opts).unwrap(); } // The evented instances are not dropped here so deregister them explicitly. - let _ = self.poll.deregister(&self.rx); let _ = self.pty.deregister(&self.poll); (self, state) }) } } + +/// Helper type which tracks how much of a buffer has been written. +struct Writing { + source: Cow<'static, [u8]>, + written: usize, +} + +pub struct Notifier(pub EventLoopSender); + +impl event::Notify for Notifier { + fn notify<B>(&self, bytes: B) + where + B: Into<Cow<'static, [u8]>>, + { + let bytes = bytes.into(); + // Terminal hangs if we send 0 bytes through. + if bytes.len() == 0 { + return; + } + + self.0.send(Msg::Input(bytes)); + } +} + +impl event::OnResize for Notifier { + fn on_resize(&mut self, window_size: WindowSize) { + self.0.send(Msg::Resize(window_size)); + } +} + +pub struct EventLoopSender { + sender: Sender<Msg>, + poller: Arc<polling::Poller>, +} + +impl EventLoopSender { + pub fn send(&self, msg: Msg) { + let _ = self.sender.send(msg); + let _ = self.poller.notify(); + } +} + +/// All of the mutable state needed to run the event loop. +/// +/// Contains list of items to write, current write state, etc. Anything that +/// would otherwise be mutated on the `EventLoop` goes here. +#[derive(Default)] +pub struct State { + write_list: VecDeque<Cow<'static, [u8]>>, + writing: Option<Writing>, + parser: ansi::Processor, +} + +impl State { + #[inline] + fn ensure_next(&mut self) { + if self.writing.is_none() { + self.goto_next(); + } + } + + #[inline] + fn goto_next(&mut self) { + self.writing = self.write_list.pop_front().map(Writing::new); + } + + #[inline] + fn take_current(&mut self) -> Option<Writing> { + self.writing.take() + } + + #[inline] + fn needs_write(&self) -> bool { + self.writing.is_some() || !self.write_list.is_empty() + } + + #[inline] + fn set_current(&mut self, new: Option<Writing>) { + self.writing = new; + } +} + +impl Writing { + #[inline] + fn new(c: Cow<'static, [u8]>) -> Writing { + Writing { source: c, written: 0 } + } + + #[inline] + fn advance(&mut self, n: usize) { + self.written += n; + } + + #[inline] + fn remaining_bytes(&self) -> &[u8] { + &self.source[self.written..] + } + + #[inline] + fn finished(&self) -> bool { + self.written >= self.source.len() + } +} + +struct PeekableReceiver<T> { + rx: Receiver<T>, + peeked: Option<T>, +} + +impl<T> PeekableReceiver<T> { + fn new(rx: Receiver<T>) -> Self { + Self { rx, peeked: None } + } + + fn peek(&mut self) -> Option<&T> { + if self.peeked.is_none() { + self.peeked = self.rx.try_recv().ok(); + } + + self.peeked.as_ref() + } + + fn recv(&mut self) -> Option<T> { + if self.peeked.is_some() { + self.peeked.take() + } else { + match self.rx.try_recv() { + Err(TryRecvError::Disconnected) => panic!("event loop channel closed"), + res => res.ok(), + } + } + } +} diff --git a/alacritty_terminal/src/tty/mod.rs b/alacritty_terminal/src/tty/mod.rs index 4ce277b3..315f008c 100644 --- a/alacritty_terminal/src/tty/mod.rs +++ b/alacritty_terminal/src/tty/mod.rs @@ -1,10 +1,13 @@ //! TTY related functionality. use std::path::PathBuf; +use std::sync::Arc; use std::{env, io}; use crate::config::Config; +use polling::{Event, PollMode, Poller}; + #[cfg(not(windows))] mod unix; #[cfg(not(windows))] @@ -22,20 +25,15 @@ pub trait EventedReadWrite { type Reader: io::Read; type Writer: io::Write; - fn register( - &mut self, - _: &mio::Poll, - _: &mut dyn Iterator<Item = mio::Token>, - _: mio::Ready, - _: mio::PollOpt, - ) -> io::Result<()>; - fn reregister(&mut self, _: &mio::Poll, _: mio::Ready, _: mio::PollOpt) -> io::Result<()>; - fn deregister(&mut self, _: &mio::Poll) -> io::Result<()>; + /// # Safety + /// + /// The underlying sources must outlive their registration in the `Poller`. + unsafe fn register(&mut self, _: &Arc<Poller>, _: Event, _: PollMode) -> io::Result<()>; + fn reregister(&mut self, _: &Arc<Poller>, _: Event, _: PollMode) -> io::Result<()>; + fn deregister(&mut self, _: &Arc<Poller>) -> io::Result<()>; fn reader(&mut self) -> &mut Self::Reader; - fn read_token(&self) -> mio::Token; fn writer(&mut self) -> &mut Self::Writer; - fn write_token(&self) -> mio::Token; } /// Events concerning TTY child processes. @@ -51,8 +49,6 @@ pub enum ChildEvent { /// notified if the PTY child process does something we care about (other than writing to the TTY). /// In particular, this allows for race-free child exit notification on UNIX (cf. `SIGCHLD`). pub trait EventedPty: EventedReadWrite { - fn child_event_token(&self) -> mio::Token; - /// Tries to retrieve an event. /// /// Returns `Some(event)` on success, or `None` if there are no events to retrieve. diff --git a/alacritty_terminal/src/tty/unix.rs b/alacritty_terminal/src/tty/unix.rs index fd99edee..4523666e 100644 --- a/alacritty_terminal/src/tty/unix.rs +++ b/alacritty_terminal/src/tty/unix.rs @@ -2,26 +2,34 @@ use std::ffi::CStr; use std::fs::File; -use std::io::{Error, ErrorKind, Result}; +use std::io::{Error, ErrorKind, Read, Result}; use std::mem::MaybeUninit; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use std::os::unix::net::UnixStream; use std::os::unix::process::CommandExt; use std::process::{Child, Command, Stdio}; +use std::sync::Arc; use std::{env, ptr}; use libc::{self, c_int, winsize, TIOCSCTTY}; use log::error; -use mio::unix::EventedFd; use nix::pty::openpty; #[cfg(any(target_os = "linux", target_os = "macos"))] use nix::sys::termios::{self, InputFlags, SetArg}; +use polling::{Event, PollMode, Poller}; use signal_hook::consts as sigconsts; -use signal_hook_mio::v0_6::Signals; +use signal_hook::low_level::pipe as signal_pipe; use crate::config::PtyConfig; use crate::event::{OnResize, WindowSize}; use crate::tty::{ChildEvent, EventedPty, EventedReadWrite}; +// Interest in PTY read/writes. +pub(crate) const PTY_READ_WRITE_TOKEN: usize = 0; + +// Interest in new child events. +pub(crate) const PTY_CHILD_EVENT_TOKEN: usize = 1; + macro_rules! die { ($($arg:tt)*) => {{ error!($($arg)*); @@ -103,9 +111,7 @@ fn get_pw_entry(buf: &mut [i8; 1024]) -> Result<Passwd<'_>> { pub struct Pty { child: Child, file: File, - token: mio::Token, - signals: Signals, - signals_token: mio::Token, + signals: UnixStream, } impl Pty { @@ -254,7 +260,14 @@ pub fn new(config: &PtyConfig, window_size: WindowSize, window_id: u64) -> Resul } // Prepare signal handling before spawning child. - let signals = Signals::new([sigconsts::SIGCHLD]).expect("error preparing signal handling"); + let signals = { + let (sender, recv) = UnixStream::pair()?; + + // Register the recv end of the pipe for SIGCHLD. + signal_pipe::register(sigconsts::SIGCHLD, sender)?; + recv.set_nonblocking(true)?; + recv + }; match builder.spawn() { Ok(child) => { @@ -264,13 +277,7 @@ pub fn new(config: &PtyConfig, window_size: WindowSize, window_id: u64) -> Resul set_nonblocking(master); } - let mut pty = Pty { - child, - file: unsafe { File::from_raw_fd(master) }, - token: mio::Token::from(0), - signals, - signals_token: mio::Token::from(0), - }; + let mut pty = Pty { child, file: unsafe { File::from_raw_fd(master) }, signals }; pty.on_resize(window_size); Ok(pty) }, @@ -300,46 +307,47 @@ impl EventedReadWrite for Pty { type Writer = File; #[inline] - fn register( + unsafe fn register( &mut self, - poll: &mio::Poll, - token: &mut dyn Iterator<Item = mio::Token>, - interest: mio::Ready, - poll_opts: mio::PollOpt, + poll: &Arc<Poller>, + mut interest: Event, + poll_opts: PollMode, ) -> Result<()> { - self.token = token.next().unwrap(); - poll.register(&EventedFd(&self.file.as_raw_fd()), self.token, interest, poll_opts)?; + interest.key = PTY_READ_WRITE_TOKEN; + unsafe { + poll.add_with_mode(&self.file, interest, poll_opts)?; + } - self.signals_token = token.next().unwrap(); - poll.register( - &self.signals, - self.signals_token, - mio::Ready::readable(), - mio::PollOpt::level(), - ) + unsafe { + poll.add_with_mode( + &self.signals, + Event::readable(PTY_CHILD_EVENT_TOKEN), + PollMode::Level, + ) + } } #[inline] fn reregister( &mut self, - poll: &mio::Poll, - interest: mio::Ready, - poll_opts: mio::PollOpt, + poll: &Arc<Poller>, + mut interest: Event, + poll_opts: PollMode, ) -> Result<()> { - poll.reregister(&EventedFd(&self.file.as_raw_fd()), self.token, interest, poll_opts)?; + interest.key = PTY_READ_WRITE_TOKEN; + poll.modify_with_mode(&self.file, interest, poll_opts)?; - poll.reregister( + poll.modify_with_mode( &self.signals, - self.signals_token, - mio::Ready::readable(), - mio::PollOpt::level(), + Event::readable(PTY_CHILD_EVENT_TOKEN), + PollMode::Level, ) } #[inline] - fn deregister(&mut self, poll: &mio::Poll) -> Result<()> { - poll.deregister(&EventedFd(&self.file.as_raw_fd()))?; - poll.deregister(&self.signals) + fn deregister(&mut self, poll: &Arc<Poller>) -> Result<()> { + poll.delete(&self.file)?; + poll.delete(&self.signals) } #[inline] @@ -348,43 +356,32 @@ impl EventedReadWrite for Pty { } #[inline] - fn read_token(&self) -> mio::Token { - self.token - } - - #[inline] fn writer(&mut self) -> &mut File { &mut self.file } - - #[inline] - fn write_token(&self) -> mio::Token { - self.token - } } impl EventedPty for Pty { #[inline] fn next_child_event(&mut self) -> Option<ChildEvent> { - self.signals.pending().next().and_then(|signal| { - if signal != sigconsts::SIGCHLD { - return None; - } - - match self.child.try_wait() { - Err(e) => { - error!("Error checking child process termination: {}", e); - None - }, - Ok(None) => None, - Ok(_) => Some(ChildEvent::Exited), + // See if there has been a SIGCHLD. + let mut buf = [0u8; 1]; + if let Err(err) = self.signals.read(&mut buf) { + if err.kind() != ErrorKind::WouldBlock { + error!("Error reading from signal pipe: {}", err); } - }) - } + return None; + } - #[inline] - fn child_event_token(&self) -> mio::Token { - self.signals_token + // Match on the child process. + match self.child.try_wait() { + Err(err) => { + error!("Error checking child process termination: {}", err); + None + }, + Ok(None) => None, + Ok(_) => Some(ChildEvent::Exited), + } } } diff --git a/alacritty_terminal/src/tty/windows/blocking.rs b/alacritty_terminal/src/tty/windows/blocking.rs new file mode 100644 index 00000000..3c74be4a --- /dev/null +++ b/alacritty_terminal/src/tty/windows/blocking.rs @@ -0,0 +1,276 @@ +//! Code for running a reader/writer on another thread while driving it through `polling`. + +use std::io::prelude::*; +use std::marker::PhantomData; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Wake, Waker}; +use std::{io, thread}; + +use piper::{pipe, Reader, Writer}; +use polling::os::iocp::{CompletionPacket, PollerIocpExt}; +use polling::{Event, PollMode, Poller}; + +use crate::thread::spawn_named; + +struct Registration { + interest: Mutex<Option<Interest>>, + end: PipeEnd, +} + +#[derive(Copy, Clone)] +enum PipeEnd { + Reader, + Writer, +} + +struct Interest { + /// The event to send about completion. + event: Event, + + /// The poller to send the event to. + poller: Arc<Poller>, + + /// The mode that we are in. + mode: PollMode, +} + +/// Poll a reader in another thread. +pub struct UnblockedReader<R> { + /// The event to send about completion. + interest: Arc<Registration>, + + /// The pipe that we are reading from. + pipe: Reader, + + /// Is this the first time registering? + first_register: bool, + + /// We logically own the reader, but we don't actually use it. + _reader: PhantomData<R>, +} + +impl<R: Read + Send + 'static> UnblockedReader<R> { + /// Spawn a new unblocked reader. + pub fn new(mut source: R, pipe_capacity: usize) -> Self { + // Create a new pipe. + let (reader, mut writer) = pipe(pipe_capacity); + let interest = Arc::new(Registration { + interest: Mutex::<Option<Interest>>::new(None), + end: PipeEnd::Reader, + }); + + // Spawn the reader thread. + spawn_named("alacritty-tty-reader-thread", move || { + let waker = Waker::from(Arc::new(ThreadWaker(thread::current()))); + let mut context = Context::from_waker(&waker); + + loop { + // Read from the reader into the pipe. + match writer.poll_fill(&mut context, &mut source) { + Poll::Ready(Ok(0)) => { + // Either the pipe is closed or the reader is at its EOF. + // In any case, we are done. + return; + }, + + Poll::Ready(Ok(_)) => { + // Keep reading. + continue; + }, + + Poll::Ready(Err(e)) if e.kind() == io::ErrorKind::Interrupted => { + // We were interrupted; continue. + continue; + }, + + Poll::Ready(Err(e)) => { + log::error!("error writing to pipe: {}", e); + return; + }, + + Poll::Pending => { + // We are now waiting on the other end to advance. Park the + // thread until they do. + thread::park(); + }, + } + } + }); + + Self { interest, pipe: reader, first_register: true, _reader: PhantomData } + } + + /// Register interest in the reader. + pub fn register(&mut self, poller: &Arc<Poller>, event: Event, mode: PollMode) { + let mut interest = self.interest.interest.lock().unwrap(); + *interest = Some(Interest { event, poller: poller.clone(), mode }); + + // Send the event to start off with if we have any data. + if (!self.pipe.is_empty() && event.readable) || self.first_register { + self.first_register = false; + poller.post(CompletionPacket::new(event)).ok(); + } + } + + /// Deregister interest in the reader. + pub fn deregister(&self) { + let mut interest = self.interest.interest.lock().unwrap(); + *interest = None; + } + + /// Try to read from the reader. + pub fn try_read(&mut self, buf: &mut [u8]) -> usize { + let waker = Waker::from(self.interest.clone()); + + match self.pipe.poll_drain_bytes(&mut Context::from_waker(&waker), buf) { + Poll::Pending => 0, + Poll::Ready(n) => n, + } + } +} + +impl<R: Read + Send + 'static> Read for UnblockedReader<R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + Ok(self.try_read(buf)) + } +} + +/// Poll a writer in another thread. +pub struct UnblockedWriter<W> { + /// The interest to send about completion. + interest: Arc<Registration>, + + /// The pipe that we are writing to. + pipe: Writer, + + /// We logically own the writer, but we don't actually use it. + _reader: PhantomData<W>, +} + +impl<W: Write + Send + 'static> UnblockedWriter<W> { + /// Spawn a new unblocked writer. + pub fn new(mut sink: W, pipe_capacity: usize) -> Self { + // Create a new pipe. + let (mut reader, writer) = pipe(pipe_capacity); + let interest = Arc::new(Registration { + interest: Mutex::<Option<Interest>>::new(None), + end: PipeEnd::Writer, + }); + + // Spawn the writer thread. + spawn_named("alacritty-tty-writer-thread", move || { + let waker = Waker::from(Arc::new(ThreadWaker(thread::current()))); + let mut context = Context::from_waker(&waker); + + loop { + // Write from the pipe into the writer. + match reader.poll_drain(&mut context, &mut sink) { + Poll::Ready(Ok(0)) => { + // Either the pipe is closed or the writer is full. + // In any case, we are done. + return; + }, + + Poll::Ready(Ok(_)) => { + // Keep writing. + continue; + }, + + Poll::Ready(Err(e)) if e.kind() == io::ErrorKind::Interrupted => { + // We were interrupted; continue. + continue; + }, + + Poll::Ready(Err(e)) => { + log::error!("error writing to pipe: {}", e); + return; + }, + + Poll::Pending => { + // We are now waiting on the other end to advance. Park the + // thread until they do. + thread::park(); + }, + } + } + }); + + Self { interest, pipe: writer, _reader: PhantomData } + } + + /// Register interest in the writer. + pub fn register(&self, poller: &Arc<Poller>, event: Event, mode: PollMode) { + let mut interest = self.interest.interest.lock().unwrap(); + *interest = Some(Interest { event, poller: poller.clone(), mode }); + + // Send the event to start off with if we have room for data. + if !self.pipe.is_full() && event.writable { + poller.post(CompletionPacket::new(event)).ok(); + } + } + + /// Deregister interest in the writer. + pub fn deregister(&self) { + let mut interest = self.interest.interest.lock().unwrap(); + *interest = None; + } + + /// Try to write to the writer. + pub fn try_write(&mut self, buf: &[u8]) -> usize { + let waker = Waker::from(self.interest.clone()); + + match self.pipe.poll_fill_bytes(&mut Context::from_waker(&waker), buf) { + Poll::Pending => 0, + Poll::Ready(n) => n, + } + } +} + +impl<W: Write + Send + 'static> Write for UnblockedWriter<W> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + Ok(self.try_write(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + // Nothing to flush. + Ok(()) + } +} + +struct ThreadWaker(thread::Thread); + +impl Wake for ThreadWaker { + fn wake(self: Arc<Self>) { + self.0.unpark(); + } + + fn wake_by_ref(self: &Arc<Self>) { + self.0.unpark(); + } +} + +impl Wake for Registration { + fn wake(self: Arc<Self>) { + self.wake_by_ref(); + } + + fn wake_by_ref(self: &Arc<Self>) { + let mut interest_lock = self.interest.lock().unwrap(); + if let Some(interest) = interest_lock.as_ref() { + // Send the event to the poller. + let send_event = match self.end { + PipeEnd::Reader => interest.event.readable, + PipeEnd::Writer => interest.event.writable, + }; + + if send_event { + interest.poller.post(CompletionPacket::new(interest.event)).ok(); + + // Clear the event if we're in oneshot mode. + if matches!(interest.mode, PollMode::Oneshot | PollMode::EdgeOneshot) { + *interest_lock = None; + } + } + } + } +} diff --git a/alacritty_terminal/src/tty/windows/child.rs b/alacritty_terminal/src/tty/windows/child.rs index 19c8a195..6bc9ed20 100644 --- a/alacritty_terminal/src/tty/windows/child.rs +++ b/alacritty_terminal/src/tty/windows/child.rs @@ -1,8 +1,10 @@ use std::ffi::c_void; use std::io::Error; use std::sync::atomic::{AtomicPtr, Ordering}; +use std::sync::{mpsc, Arc, Mutex}; -use mio_extras::channel::{channel, Receiver, Sender}; +use polling::os::iocp::{CompletionPacket, PollerIocpExt}; +use polling::{Event, Poller}; use windows_sys::Win32::Foundation::{BOOLEAN, HANDLE}; use windows_sys::Win32::System::Threading::{ @@ -12,27 +14,43 @@ use windows_sys::Win32::System::Threading::{ use crate::tty::ChildEvent; +struct Interest { + poller: Arc<Poller>, + event: Event, +} + +struct ChildExitSender { + sender: mpsc::Sender<ChildEvent>, + interest: Arc<Mutex<Option<Interest>>>, +} + /// WinAPI callback to run when child process exits. extern "system" fn child_exit_callback(ctx: *mut c_void, timed_out: BOOLEAN) { if timed_out != 0 { return; } - let event_tx: Box<_> = unsafe { Box::from_raw(ctx as *mut Sender<ChildEvent>) }; - let _ = event_tx.send(ChildEvent::Exited); + let event_tx: Box<_> = unsafe { Box::from_raw(ctx as *mut ChildExitSender) }; + let _ = event_tx.sender.send(ChildEvent::Exited); + let interest = event_tx.interest.lock().unwrap(); + if let Some(interest) = interest.as_ref() { + interest.poller.post(CompletionPacket::new(interest.event)).ok(); + } } pub struct ChildExitWatcher { wait_handle: AtomicPtr<c_void>, - event_rx: Receiver<ChildEvent>, + event_rx: mpsc::Receiver<ChildEvent>, + interest: Arc<Mutex<Option<Interest>>>, } impl ChildExitWatcher { pub fn new(child_handle: HANDLE) -> Result<ChildExitWatcher, Error> { - let (event_tx, event_rx) = channel::<ChildEvent>(); + let (event_tx, event_rx) = mpsc::channel(); let mut wait_handle: HANDLE = 0; - let sender_ref = Box::new(event_tx); + let interest = Arc::new(Mutex::new(None)); + let sender_ref = Box::new(ChildExitSender { sender: event_tx, interest: interest.clone() }); let success = unsafe { RegisterWaitForSingleObject( @@ -51,13 +69,22 @@ impl ChildExitWatcher { Ok(ChildExitWatcher { wait_handle: AtomicPtr::from(wait_handle as *mut c_void), event_rx, + interest, }) } } - pub fn event_rx(&self) -> &Receiver<ChildEvent> { + pub fn event_rx(&self) -> &mpsc::Receiver<ChildEvent> { &self.event_rx } + + pub fn register(&self, poller: &Arc<Poller>, event: Event) { + *self.interest.lock().unwrap() = Some(Interest { poller: poller.clone(), event }); + } + + pub fn deregister(&self) { + *self.interest.lock().unwrap() = None; + } } impl Drop for ChildExitWatcher { @@ -72,36 +99,28 @@ impl Drop for ChildExitWatcher { mod tests { use std::os::windows::io::AsRawHandle; use std::process::Command; + use std::sync::Arc; use std::time::Duration; - use mio::{Events, Poll, PollOpt, Ready, Token}; - + use super::super::PTY_CHILD_EVENT_TOKEN; use super::*; #[test] pub fn event_is_emitted_when_child_exits() { const WAIT_TIMEOUT: Duration = Duration::from_millis(200); + let poller = Arc::new(Poller::new().unwrap()); + let mut child = Command::new("cmd.exe").spawn().unwrap(); let child_exit_watcher = ChildExitWatcher::new(child.as_raw_handle() as HANDLE).unwrap(); - - let mut events = Events::with_capacity(1); - let poll = Poll::new().unwrap(); - let child_events_token = Token::from(0usize); - - poll.register( - child_exit_watcher.event_rx(), - child_events_token, - Ready::readable(), - PollOpt::oneshot(), - ) - .unwrap(); + child_exit_watcher.register(&poller, Event::readable(PTY_CHILD_EVENT_TOKEN)); child.kill().unwrap(); // Poll for the event or fail with timeout if nothing has been sent. - poll.poll(&mut events, Some(WAIT_TIMEOUT)).unwrap(); - assert_eq!(events.iter().next().unwrap().token(), child_events_token); + let mut events = polling::Events::new(); + poller.wait(&mut events, Some(WAIT_TIMEOUT)).unwrap(); + assert_eq!(events.iter().next().unwrap().key, PTY_CHILD_EVENT_TOKEN); // Verify that at least one `ChildEvent::Exited` was received. assert_eq!(child_exit_watcher.event_rx().try_recv(), Ok(ChildEvent::Exited)); } diff --git a/alacritty_terminal/src/tty/windows/conpty.rs b/alacritty_terminal/src/tty/windows/conpty.rs index c9ed631e..12189371 100644 --- a/alacritty_terminal/src/tty/windows/conpty.rs +++ b/alacritty_terminal/src/tty/windows/conpty.rs @@ -3,8 +3,6 @@ use std::io::Error; use std::os::windows::io::IntoRawHandle; use std::{mem, ptr}; -use mio_anonymous_pipes::{EventedAnonRead, EventedAnonWrite}; - use windows_sys::core::{HRESULT, PWSTR}; use windows_sys::Win32::Foundation::{HANDLE, S_OK}; use windows_sys::Win32::System::Console::{ @@ -21,9 +19,12 @@ use windows_sys::Win32::System::Threading::{ use crate::config::PtyConfig; use crate::event::{OnResize, WindowSize}; +use crate::tty::windows::blocking::{UnblockedReader, UnblockedWriter}; use crate::tty::windows::child::ChildExitWatcher; use crate::tty::windows::{cmdline, win32_string, Pty}; +const PIPE_CAPACITY: usize = crate::event_loop::READ_BUFFER_SIZE; + /// Load the pseudoconsole API from conpty.dll if possible, otherwise use the /// standard Windows API. /// @@ -220,8 +221,8 @@ pub fn new(config: &PtyConfig, window_size: WindowSize) -> Option<Pty> { } } - let conin = EventedAnonWrite::new(conin); - let conout = EventedAnonRead::new(conout); + let conin = UnblockedWriter::new(conin, PIPE_CAPACITY); + let conout = UnblockedReader::new(conout, PIPE_CAPACITY); let child_watcher = ChildExitWatcher::new(proc_info.hProcess).unwrap(); let conpty = Conpty { handle: pty_handle as HPCON, api }; diff --git a/alacritty_terminal/src/tty/windows/mod.rs b/alacritty_terminal/src/tty/windows/mod.rs index 57925f4c..080f6e83 100644 --- a/alacritty_terminal/src/tty/windows/mod.rs +++ b/alacritty_terminal/src/tty/windows/mod.rs @@ -3,17 +3,27 @@ use std::io::{self, Error, ErrorKind, Result}; use std::iter::once; use std::os::windows::ffi::OsStrExt; use std::sync::mpsc::TryRecvError; +use std::sync::Arc; use crate::config::{Program, PtyConfig}; use crate::event::{OnResize, WindowSize}; use crate::tty::windows::child::ChildExitWatcher; use crate::tty::{ChildEvent, EventedPty, EventedReadWrite}; +mod blocking; mod child; mod conpty; +use blocking::{UnblockedReader, UnblockedWriter}; use conpty::Conpty as Backend; -use mio_anonymous_pipes::{EventedAnonRead as ReadPipe, EventedAnonWrite as WritePipe}; +use miow::pipe::{AnonRead, AnonWrite}; +use polling::{Event, Poller}; + +pub const PTY_CHILD_EVENT_TOKEN: usize = 1; +pub const PTY_READ_WRITE_TOKEN: usize = 2; + +type ReadPipe = UnblockedReader<AnonRead>; +type WritePipe = UnblockedWriter<AnonWrite>; pub struct Pty { // XXX: Backend is required to be the first field, to ensure correct drop order. Dropping @@ -21,9 +31,6 @@ pub struct Pty { backend: Backend, conout: ReadPipe, conin: WritePipe, - read_token: mio::Token, - write_token: mio::Token, - child_event_token: mio::Token, child_watcher: ChildExitWatcher, } @@ -39,51 +46,29 @@ impl Pty { conin: impl Into<WritePipe>, child_watcher: ChildExitWatcher, ) -> Self { - Self { - backend: backend.into(), - conout: conout.into(), - conin: conin.into(), - read_token: 0.into(), - write_token: 0.into(), - child_event_token: 0.into(), - child_watcher, - } + Self { backend: backend.into(), conout: conout.into(), conin: conin.into(), child_watcher } } } +fn with_key(mut event: Event, key: usize) -> Event { + event.key = key; + event +} + impl EventedReadWrite for Pty { type Reader = ReadPipe; type Writer = WritePipe; #[inline] - fn register( + unsafe fn register( &mut self, - poll: &mio::Poll, - token: &mut dyn Iterator<Item = mio::Token>, - interest: mio::Ready, - poll_opts: mio::PollOpt, + poll: &Arc<Poller>, + interest: polling::Event, + poll_opts: polling::PollMode, ) -> io::Result<()> { - self.read_token = token.next().unwrap(); - self.write_token = token.next().unwrap(); - - if interest.is_readable() { - poll.register(&self.conout, self.read_token, mio::Ready::readable(), poll_opts)? - } else { - poll.register(&self.conout, self.read_token, mio::Ready::empty(), poll_opts)? - } - if interest.is_writable() { - poll.register(&self.conin, self.write_token, mio::Ready::writable(), poll_opts)? - } else { - poll.register(&self.conin, self.write_token, mio::Ready::empty(), poll_opts)? - } - - self.child_event_token = token.next().unwrap(); - poll.register( - self.child_watcher.event_rx(), - self.child_event_token, - mio::Ready::readable(), - poll_opts, - )?; + self.conin.register(poll, with_key(interest, PTY_READ_WRITE_TOKEN), poll_opts); + self.conout.register(poll, with_key(interest, PTY_READ_WRITE_TOKEN), poll_opts); + self.child_watcher.register(poll, with_key(interest, PTY_CHILD_EVENT_TOKEN)); Ok(()) } @@ -91,36 +76,23 @@ impl EventedReadWrite for Pty { #[inline] fn reregister( &mut self, - poll: &mio::Poll, - interest: mio::Ready, - poll_opts: mio::PollOpt, + poll: &Arc<Poller>, + interest: polling::Event, + poll_opts: polling::PollMode, ) -> io::Result<()> { - if interest.is_readable() { - poll.reregister(&self.conout, self.read_token, mio::Ready::readable(), poll_opts)?; - } else { - poll.reregister(&self.conout, self.read_token, mio::Ready::empty(), poll_opts)?; - } - if interest.is_writable() { - poll.reregister(&self.conin, self.write_token, mio::Ready::writable(), poll_opts)?; - } else { - poll.reregister(&self.conin, self.write_token, mio::Ready::empty(), poll_opts)?; - } - - poll.reregister( - self.child_watcher.event_rx(), - self.child_event_token, - mio::Ready::readable(), - poll_opts, - )?; + self.conin.register(poll, with_key(interest, PTY_READ_WRITE_TOKEN), poll_opts); + self.conout.register(poll, with_key(interest, PTY_READ_WRITE_TOKEN), poll_opts); + self.child_watcher.register(poll, with_key(interest, PTY_CHILD_EVENT_TOKEN)); Ok(()) } #[inline] - fn deregister(&mut self, poll: &mio::Poll) -> io::Result<()> { - poll.deregister(&self.conout)?; - poll.deregister(&self.conin)?; - poll.deregister(self.child_watcher.event_rx())?; + fn deregister(&mut self, _poll: &Arc<Poller>) -> io::Result<()> { + self.conin.deregister(); + self.conout.deregister(); + self.child_watcher.deregister(); + Ok(()) } @@ -130,26 +102,12 @@ impl EventedReadWrite for Pty { } #[inline] - fn read_token(&self) -> mio::Token { - self.read_token - } - - #[inline] fn writer(&mut self) -> &mut Self::Writer { &mut self.conin } - - #[inline] - fn write_token(&self) -> mio::Token { - self.write_token - } } impl EventedPty for Pty { - fn child_event_token(&self) -> mio::Token { - self.child_event_token - } - fn next_child_event(&mut self) -> Option<ChildEvent> { match self.child_watcher.event_rx().try_recv() { Ok(ev) => Some(ev), |