aboutsummaryrefslogtreecommitdiff
path: root/alacritty_terminal
diff options
context:
space:
mode:
authorJohn Nunley <jtnunley01@gmail.com>2023-10-07 12:56:11 -0700
committerGitHub <noreply@github.com>2023-10-07 19:56:11 +0000
commitc2f8abecfbaf6b6388e7746b733b7f22cbb7a750 (patch)
tree3b8e3cb638d25346f7147001ee57cffa46d52f80 /alacritty_terminal
parentace987f343649ae98e5fb63cf825414855ccd86e (diff)
downloadalacritty-c2f8abecfbaf6b6388e7746b733b7f22cbb7a750.tar.gz
alacritty-c2f8abecfbaf6b6388e7746b733b7f22cbb7a750.zip
Port from mio to polling
This patch replaces the mio crate with the polling. Now that smol-rs/polling#96 has been merged, we should be at full feature parity with mio v0.6 now. Fixes #7104. Fixes #6486.
Diffstat (limited to 'alacritty_terminal')
-rw-r--r--alacritty_terminal/Cargo.toml10
-rw-r--r--alacritty_terminal/src/event_loop.rs323
-rw-r--r--alacritty_terminal/src/tty/mod.rs22
-rw-r--r--alacritty_terminal/src/tty/unix.rs129
-rw-r--r--alacritty_terminal/src/tty/windows/blocking.rs276
-rw-r--r--alacritty_terminal/src/tty/windows/child.rs65
-rw-r--r--alacritty_terminal/src/tty/windows/conpty.rs9
-rw-r--r--alacritty_terminal/src/tty/windows/mod.rs112
8 files changed, 606 insertions, 340 deletions
diff --git a/alacritty_terminal/Cargo.toml b/alacritty_terminal/Cargo.toml
index dc164413..7498afab 100644
--- a/alacritty_terminal/Cargo.toml
+++ b/alacritty_terminal/Cargo.toml
@@ -23,9 +23,8 @@ bitflags = { version = "2.2.1", features = ["serde"] }
home = "0.5.5"
libc = "0.2"
log = "0.4"
-mio = "0.6.20"
-mio-extras = "2"
parking_lot = "0.12.0"
+polling = "3.0.0"
regex-automata = "0.3.6"
serde = { version = "1", features = ["derive", "rc"] }
serde_yaml = "0.8"
@@ -36,12 +35,11 @@ vte = { version = "0.12.0", default-features = false, features = ["ansi", "serde
[target.'cfg(unix)'.dependencies]
nix = { version = "0.26.2", default-features = false, features = ["term"] }
signal-hook = "0.3.10"
-signal-hook-mio = { version = "0.2.1", features = ["support-v0_6"] }
[target.'cfg(windows)'.dependencies]
-mio-anonymous-pipes = "0.2"
-miow = "0.3"
-windows-sys = { version = "0.48", features = [
+piper = "0.2.1"
+miow = "0.3.0"
+windows-sys = { version = "0.48.0", features = [
"Win32_System_Console",
"Win32_Foundation",
"Win32_Security",
diff --git a/alacritty_terminal/src/event_loop.rs b/alacritty_terminal/src/event_loop.rs
index 61dc69bc..de22e49d 100644
--- a/alacritty_terminal/src/event_loop.rs
+++ b/alacritty_terminal/src/event_loop.rs
@@ -5,15 +5,14 @@ use std::collections::VecDeque;
use std::fs::File;
use std::io::{self, ErrorKind, Read, Write};
use std::marker::Send;
+use std::num::NonZeroUsize;
+use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Instant;
use log::error;
-#[cfg(not(windows))]
-use mio::unix::UnixReady;
-use mio::{self, Events, PollOpt, Ready};
-use mio_extras::channel::{self, Receiver, Sender};
+use polling::{Event as PollingEvent, Events, PollMode};
use crate::event::{self, Event, EventListener, WindowSize};
use crate::sync::FairMutex;
@@ -21,7 +20,7 @@ use crate::term::Term;
use crate::{ansi, thread, tty};
/// Max bytes to read from the PTY before forced terminal synchronization.
-const READ_BUFFER_SIZE: usize = 0x10_0000;
+pub(crate) const READ_BUFFER_SIZE: usize = 0x10_0000;
/// Max bytes to read from the PTY while the terminal is locked.
const MAX_LOCKED_READ: usize = u16::MAX as usize;
@@ -39,14 +38,14 @@ pub enum Msg {
Resize(WindowSize),
}
-/// The main event!.. loop.
+/// The main event loop.
///
/// Handles all the PTY I/O and runs the PTY parser which updates terminal
/// state.
pub struct EventLoop<T: tty::EventedPty, U: EventListener> {
- poll: mio::Poll,
+ poll: Arc<polling::Poller>,
pty: T,
- rx: Receiver<Msg>,
+ rx: PeekableReceiver<Msg>,
tx: Sender<Msg>,
terminal: Arc<FairMutex<Term<U>>>,
event_proxy: U,
@@ -54,97 +53,6 @@ pub struct EventLoop<T: tty::EventedPty, U: EventListener> {
ref_test: bool,
}
-/// Helper type which tracks how much of a buffer has been written.
-struct Writing {
- source: Cow<'static, [u8]>,
- written: usize,
-}
-
-pub struct Notifier(pub Sender<Msg>);
-
-impl event::Notify for Notifier {
- fn notify<B>(&self, bytes: B)
- where
- B: Into<Cow<'static, [u8]>>,
- {
- let bytes = bytes.into();
- // terminal hangs if we send 0 bytes through.
- if bytes.len() == 0 {
- return;
- }
-
- let _ = self.0.send(Msg::Input(bytes));
- }
-}
-
-impl event::OnResize for Notifier {
- fn on_resize(&mut self, window_size: WindowSize) {
- let _ = self.0.send(Msg::Resize(window_size));
- }
-}
-
-/// All of the mutable state needed to run the event loop.
-///
-/// Contains list of items to write, current write state, etc. Anything that
-/// would otherwise be mutated on the `EventLoop` goes here.
-#[derive(Default)]
-pub struct State {
- write_list: VecDeque<Cow<'static, [u8]>>,
- writing: Option<Writing>,
- parser: ansi::Processor,
-}
-
-impl State {
- #[inline]
- fn ensure_next(&mut self) {
- if self.writing.is_none() {
- self.goto_next();
- }
- }
-
- #[inline]
- fn goto_next(&mut self) {
- self.writing = self.write_list.pop_front().map(Writing::new);
- }
-
- #[inline]
- fn take_current(&mut self) -> Option<Writing> {
- self.writing.take()
- }
-
- #[inline]
- fn needs_write(&self) -> bool {
- self.writing.is_some() || !self.write_list.is_empty()
- }
-
- #[inline]
- fn set_current(&mut self, new: Option<Writing>) {
- self.writing = new;
- }
-}
-
-impl Writing {
- #[inline]
- fn new(c: Cow<'static, [u8]>) -> Writing {
- Writing { source: c, written: 0 }
- }
-
- #[inline]
- fn advance(&mut self, n: usize) {
- self.written += n;
- }
-
- #[inline]
- fn remaining_bytes(&self) -> &[u8] {
- &self.source[self.written..]
- }
-
- #[inline]
- fn finished(&self) -> bool {
- self.written >= self.source.len()
- }
-}
-
impl<T, U> EventLoop<T, U>
where
T: tty::EventedPty + event::OnResize + Send + 'static,
@@ -158,12 +66,12 @@ where
hold: bool,
ref_test: bool,
) -> EventLoop<T, U> {
- let (tx, rx) = channel::channel();
+ let (tx, rx) = mpsc::channel();
EventLoop {
- poll: mio::Poll::new().expect("create mio Poll"),
+ poll: polling::Poller::new().expect("create Poll").into(),
pty,
tx,
- rx,
+ rx: PeekableReceiver::new(rx),
terminal,
event_proxy,
hold,
@@ -171,15 +79,15 @@ where
}
}
- pub fn channel(&self) -> Sender<Msg> {
- self.tx.clone()
+ pub fn channel(&self) -> EventLoopSender {
+ EventLoopSender { sender: self.tx.clone(), poller: self.poll.clone() }
}
/// Drain the channel.
///
/// Returns `false` when a shutdown message was received.
fn drain_recv_channel(&mut self, state: &mut State) -> bool {
- while let Ok(msg) = self.rx.try_recv() {
+ while let Some(msg) = self.rx.recv() {
match msg {
Msg::Input(input) => state.write_list.push_back(input),
Msg::Resize(window_size) => self.pty.on_resize(window_size),
@@ -190,20 +98,6 @@ where
true
}
- /// Returns a `bool` indicating whether or not the event loop should continue running.
- #[inline]
- fn channel_event(&mut self, token: mio::Token, state: &mut State) -> bool {
- if !self.drain_recv_channel(state) {
- return false;
- }
-
- self.poll
- .reregister(&self.rx, token, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
- .unwrap();
-
- true
- }
-
#[inline]
fn pty_read<X>(
&mut self,
@@ -313,17 +207,15 @@ where
let mut state = State::default();
let mut buf = [0u8; READ_BUFFER_SIZE];
- let mut tokens = (0..).map(Into::into);
-
- let poll_opts = PollOpt::edge() | PollOpt::oneshot();
-
- let channel_token = tokens.next().unwrap();
- self.poll.register(&self.rx, channel_token, Ready::readable(), poll_opts).unwrap();
+ let poll_opts = PollMode::Level;
+ let mut interest = PollingEvent::readable(0);
// Register TTY through EventedRW interface.
- self.pty.register(&self.poll, &mut tokens, Ready::readable(), poll_opts).unwrap();
+ unsafe {
+ self.pty.register(&self.poll, interest, poll_opts).unwrap();
+ }
- let mut events = Events::with_capacity(1024);
+ let mut events = Events::with_capacity(NonZeroUsize::new(1024).unwrap());
let mut pipe = if self.ref_test {
Some(File::create("./alacritty.recording").expect("create alacritty recording"))
@@ -337,7 +229,8 @@ where
let timeout =
handler.sync_timeout().map(|st| st.saturating_duration_since(Instant::now()));
- if let Err(err) = self.poll.poll(&mut events, timeout) {
+ events.clear();
+ if let Err(err) = self.poll.wait(&mut events, timeout) {
match err.kind() {
ErrorKind::Interrupted => continue,
_ => panic!("EventLoop polling error: {err:?}"),
@@ -345,21 +238,20 @@ where
}
// Handle synchronized update timeout.
- if events.is_empty() {
+ if events.is_empty() && self.rx.peek().is_none() {
state.parser.stop_sync(&mut *self.terminal.lock());
self.event_proxy.send_event(Event::Wakeup);
continue;
}
- for event in events.iter() {
- match event.token() {
- token if token == channel_token => {
- if !self.channel_event(channel_token, &mut state) {
- break 'event_loop;
- }
- },
+ // Handle channel events, if there are any.
+ if !self.drain_recv_channel(&mut state) {
+ break;
+ }
- token if token == self.pty.child_event_token() => {
+ for event in events.iter() {
+ match event.key {
+ tty::PTY_CHILD_EVENT_TOKEN => {
if let Some(tty::ChildEvent::Exited) = self.pty.next_child_event() {
if self.hold {
// With hold enabled, make sure the PTY is drained.
@@ -374,17 +266,13 @@ where
}
},
- token
- if token == self.pty.read_token()
- || token == self.pty.write_token() =>
- {
- #[cfg(unix)]
- if UnixReady::from(event.readiness()).is_hup() {
+ tty::PTY_READ_WRITE_TOKEN => {
+ if event.is_interrupt() {
// Don't try to do I/O on a dead PTY.
continue;
}
- if event.readiness().is_readable() {
+ if event.readable {
if let Err(err) = self.pty_read(&mut state, &mut buf, pipe.as_mut())
{
// On Linux, a `read` on the master side of a PTY can fail
@@ -402,7 +290,7 @@ where
}
}
- if event.readiness().is_writable() {
+ if event.writable {
if let Err(err) = self.pty_write(&mut state) {
error!("Error writing to PTY in event loop: {}", err);
break 'event_loop;
@@ -414,19 +302,152 @@ where
}
// Register write interest if necessary.
- let mut interest = Ready::readable();
- if state.needs_write() {
- interest.insert(Ready::writable());
+ let needs_write = state.needs_write();
+ if needs_write != interest.writable {
+ interest.writable = needs_write;
+
+ // Re-register with new interest.
+ self.pty.reregister(&self.poll, interest, poll_opts).unwrap();
}
- // Reregister with new interest.
- self.pty.reregister(&self.poll, interest, poll_opts).unwrap();
}
// The evented instances are not dropped here so deregister them explicitly.
- let _ = self.poll.deregister(&self.rx);
let _ = self.pty.deregister(&self.poll);
(self, state)
})
}
}
+
+/// Helper type which tracks how much of a buffer has been written.
+struct Writing {
+ source: Cow<'static, [u8]>,
+ written: usize,
+}
+
+pub struct Notifier(pub EventLoopSender);
+
+impl event::Notify for Notifier {
+ fn notify<B>(&self, bytes: B)
+ where
+ B: Into<Cow<'static, [u8]>>,
+ {
+ let bytes = bytes.into();
+ // Terminal hangs if we send 0 bytes through.
+ if bytes.len() == 0 {
+ return;
+ }
+
+ self.0.send(Msg::Input(bytes));
+ }
+}
+
+impl event::OnResize for Notifier {
+ fn on_resize(&mut self, window_size: WindowSize) {
+ self.0.send(Msg::Resize(window_size));
+ }
+}
+
+pub struct EventLoopSender {
+ sender: Sender<Msg>,
+ poller: Arc<polling::Poller>,
+}
+
+impl EventLoopSender {
+ pub fn send(&self, msg: Msg) {
+ let _ = self.sender.send(msg);
+ let _ = self.poller.notify();
+ }
+}
+
+/// All of the mutable state needed to run the event loop.
+///
+/// Contains list of items to write, current write state, etc. Anything that
+/// would otherwise be mutated on the `EventLoop` goes here.
+#[derive(Default)]
+pub struct State {
+ write_list: VecDeque<Cow<'static, [u8]>>,
+ writing: Option<Writing>,
+ parser: ansi::Processor,
+}
+
+impl State {
+ #[inline]
+ fn ensure_next(&mut self) {
+ if self.writing.is_none() {
+ self.goto_next();
+ }
+ }
+
+ #[inline]
+ fn goto_next(&mut self) {
+ self.writing = self.write_list.pop_front().map(Writing::new);
+ }
+
+ #[inline]
+ fn take_current(&mut self) -> Option<Writing> {
+ self.writing.take()
+ }
+
+ #[inline]
+ fn needs_write(&self) -> bool {
+ self.writing.is_some() || !self.write_list.is_empty()
+ }
+
+ #[inline]
+ fn set_current(&mut self, new: Option<Writing>) {
+ self.writing = new;
+ }
+}
+
+impl Writing {
+ #[inline]
+ fn new(c: Cow<'static, [u8]>) -> Writing {
+ Writing { source: c, written: 0 }
+ }
+
+ #[inline]
+ fn advance(&mut self, n: usize) {
+ self.written += n;
+ }
+
+ #[inline]
+ fn remaining_bytes(&self) -> &[u8] {
+ &self.source[self.written..]
+ }
+
+ #[inline]
+ fn finished(&self) -> bool {
+ self.written >= self.source.len()
+ }
+}
+
+struct PeekableReceiver<T> {
+ rx: Receiver<T>,
+ peeked: Option<T>,
+}
+
+impl<T> PeekableReceiver<T> {
+ fn new(rx: Receiver<T>) -> Self {
+ Self { rx, peeked: None }
+ }
+
+ fn peek(&mut self) -> Option<&T> {
+ if self.peeked.is_none() {
+ self.peeked = self.rx.try_recv().ok();
+ }
+
+ self.peeked.as_ref()
+ }
+
+ fn recv(&mut self) -> Option<T> {
+ if self.peeked.is_some() {
+ self.peeked.take()
+ } else {
+ match self.rx.try_recv() {
+ Err(TryRecvError::Disconnected) => panic!("event loop channel closed"),
+ res => res.ok(),
+ }
+ }
+ }
+}
diff --git a/alacritty_terminal/src/tty/mod.rs b/alacritty_terminal/src/tty/mod.rs
index 4ce277b3..315f008c 100644
--- a/alacritty_terminal/src/tty/mod.rs
+++ b/alacritty_terminal/src/tty/mod.rs
@@ -1,10 +1,13 @@
//! TTY related functionality.
use std::path::PathBuf;
+use std::sync::Arc;
use std::{env, io};
use crate::config::Config;
+use polling::{Event, PollMode, Poller};
+
#[cfg(not(windows))]
mod unix;
#[cfg(not(windows))]
@@ -22,20 +25,15 @@ pub trait EventedReadWrite {
type Reader: io::Read;
type Writer: io::Write;
- fn register(
- &mut self,
- _: &mio::Poll,
- _: &mut dyn Iterator<Item = mio::Token>,
- _: mio::Ready,
- _: mio::PollOpt,
- ) -> io::Result<()>;
- fn reregister(&mut self, _: &mio::Poll, _: mio::Ready, _: mio::PollOpt) -> io::Result<()>;
- fn deregister(&mut self, _: &mio::Poll) -> io::Result<()>;
+ /// # Safety
+ ///
+ /// The underlying sources must outlive their registration in the `Poller`.
+ unsafe fn register(&mut self, _: &Arc<Poller>, _: Event, _: PollMode) -> io::Result<()>;
+ fn reregister(&mut self, _: &Arc<Poller>, _: Event, _: PollMode) -> io::Result<()>;
+ fn deregister(&mut self, _: &Arc<Poller>) -> io::Result<()>;
fn reader(&mut self) -> &mut Self::Reader;
- fn read_token(&self) -> mio::Token;
fn writer(&mut self) -> &mut Self::Writer;
- fn write_token(&self) -> mio::Token;
}
/// Events concerning TTY child processes.
@@ -51,8 +49,6 @@ pub enum ChildEvent {
/// notified if the PTY child process does something we care about (other than writing to the TTY).
/// In particular, this allows for race-free child exit notification on UNIX (cf. `SIGCHLD`).
pub trait EventedPty: EventedReadWrite {
- fn child_event_token(&self) -> mio::Token;
-
/// Tries to retrieve an event.
///
/// Returns `Some(event)` on success, or `None` if there are no events to retrieve.
diff --git a/alacritty_terminal/src/tty/unix.rs b/alacritty_terminal/src/tty/unix.rs
index fd99edee..4523666e 100644
--- a/alacritty_terminal/src/tty/unix.rs
+++ b/alacritty_terminal/src/tty/unix.rs
@@ -2,26 +2,34 @@
use std::ffi::CStr;
use std::fs::File;
-use std::io::{Error, ErrorKind, Result};
+use std::io::{Error, ErrorKind, Read, Result};
use std::mem::MaybeUninit;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
+use std::os::unix::net::UnixStream;
use std::os::unix::process::CommandExt;
use std::process::{Child, Command, Stdio};
+use std::sync::Arc;
use std::{env, ptr};
use libc::{self, c_int, winsize, TIOCSCTTY};
use log::error;
-use mio::unix::EventedFd;
use nix::pty::openpty;
#[cfg(any(target_os = "linux", target_os = "macos"))]
use nix::sys::termios::{self, InputFlags, SetArg};
+use polling::{Event, PollMode, Poller};
use signal_hook::consts as sigconsts;
-use signal_hook_mio::v0_6::Signals;
+use signal_hook::low_level::pipe as signal_pipe;
use crate::config::PtyConfig;
use crate::event::{OnResize, WindowSize};
use crate::tty::{ChildEvent, EventedPty, EventedReadWrite};
+// Interest in PTY read/writes.
+pub(crate) const PTY_READ_WRITE_TOKEN: usize = 0;
+
+// Interest in new child events.
+pub(crate) const PTY_CHILD_EVENT_TOKEN: usize = 1;
+
macro_rules! die {
($($arg:tt)*) => {{
error!($($arg)*);
@@ -103,9 +111,7 @@ fn get_pw_entry(buf: &mut [i8; 1024]) -> Result<Passwd<'_>> {
pub struct Pty {
child: Child,
file: File,
- token: mio::Token,
- signals: Signals,
- signals_token: mio::Token,
+ signals: UnixStream,
}
impl Pty {
@@ -254,7 +260,14 @@ pub fn new(config: &PtyConfig, window_size: WindowSize, window_id: u64) -> Resul
}
// Prepare signal handling before spawning child.
- let signals = Signals::new([sigconsts::SIGCHLD]).expect("error preparing signal handling");
+ let signals = {
+ let (sender, recv) = UnixStream::pair()?;
+
+ // Register the recv end of the pipe for SIGCHLD.
+ signal_pipe::register(sigconsts::SIGCHLD, sender)?;
+ recv.set_nonblocking(true)?;
+ recv
+ };
match builder.spawn() {
Ok(child) => {
@@ -264,13 +277,7 @@ pub fn new(config: &PtyConfig, window_size: WindowSize, window_id: u64) -> Resul
set_nonblocking(master);
}
- let mut pty = Pty {
- child,
- file: unsafe { File::from_raw_fd(master) },
- token: mio::Token::from(0),
- signals,
- signals_token: mio::Token::from(0),
- };
+ let mut pty = Pty { child, file: unsafe { File::from_raw_fd(master) }, signals };
pty.on_resize(window_size);
Ok(pty)
},
@@ -300,46 +307,47 @@ impl EventedReadWrite for Pty {
type Writer = File;
#[inline]
- fn register(
+ unsafe fn register(
&mut self,
- poll: &mio::Poll,
- token: &mut dyn Iterator<Item = mio::Token>,
- interest: mio::Ready,
- poll_opts: mio::PollOpt,
+ poll: &Arc<Poller>,
+ mut interest: Event,
+ poll_opts: PollMode,
) -> Result<()> {
- self.token = token.next().unwrap();
- poll.register(&EventedFd(&self.file.as_raw_fd()), self.token, interest, poll_opts)?;
+ interest.key = PTY_READ_WRITE_TOKEN;
+ unsafe {
+ poll.add_with_mode(&self.file, interest, poll_opts)?;
+ }
- self.signals_token = token.next().unwrap();
- poll.register(
- &self.signals,
- self.signals_token,
- mio::Ready::readable(),
- mio::PollOpt::level(),
- )
+ unsafe {
+ poll.add_with_mode(
+ &self.signals,
+ Event::readable(PTY_CHILD_EVENT_TOKEN),
+ PollMode::Level,
+ )
+ }
}
#[inline]
fn reregister(
&mut self,
- poll: &mio::Poll,
- interest: mio::Ready,
- poll_opts: mio::PollOpt,
+ poll: &Arc<Poller>,
+ mut interest: Event,
+ poll_opts: PollMode,
) -> Result<()> {
- poll.reregister(&EventedFd(&self.file.as_raw_fd()), self.token, interest, poll_opts)?;
+ interest.key = PTY_READ_WRITE_TOKEN;
+ poll.modify_with_mode(&self.file, interest, poll_opts)?;
- poll.reregister(
+ poll.modify_with_mode(
&self.signals,
- self.signals_token,
- mio::Ready::readable(),
- mio::PollOpt::level(),
+ Event::readable(PTY_CHILD_EVENT_TOKEN),
+ PollMode::Level,
)
}
#[inline]
- fn deregister(&mut self, poll: &mio::Poll) -> Result<()> {
- poll.deregister(&EventedFd(&self.file.as_raw_fd()))?;
- poll.deregister(&self.signals)
+ fn deregister(&mut self, poll: &Arc<Poller>) -> Result<()> {
+ poll.delete(&self.file)?;
+ poll.delete(&self.signals)
}
#[inline]
@@ -348,43 +356,32 @@ impl EventedReadWrite for Pty {
}
#[inline]
- fn read_token(&self) -> mio::Token {
- self.token
- }
-
- #[inline]
fn writer(&mut self) -> &mut File {
&mut self.file
}
-
- #[inline]
- fn write_token(&self) -> mio::Token {
- self.token
- }
}
impl EventedPty for Pty {
#[inline]
fn next_child_event(&mut self) -> Option<ChildEvent> {
- self.signals.pending().next().and_then(|signal| {
- if signal != sigconsts::SIGCHLD {
- return None;
- }
-
- match self.child.try_wait() {
- Err(e) => {
- error!("Error checking child process termination: {}", e);
- None
- },
- Ok(None) => None,
- Ok(_) => Some(ChildEvent::Exited),
+ // See if there has been a SIGCHLD.
+ let mut buf = [0u8; 1];
+ if let Err(err) = self.signals.read(&mut buf) {
+ if err.kind() != ErrorKind::WouldBlock {
+ error!("Error reading from signal pipe: {}", err);
}
- })
- }
+ return None;
+ }
- #[inline]
- fn child_event_token(&self) -> mio::Token {
- self.signals_token
+ // Match on the child process.
+ match self.child.try_wait() {
+ Err(err) => {
+ error!("Error checking child process termination: {}", err);
+ None
+ },
+ Ok(None) => None,
+ Ok(_) => Some(ChildEvent::Exited),
+ }
}
}
diff --git a/alacritty_terminal/src/tty/windows/blocking.rs b/alacritty_terminal/src/tty/windows/blocking.rs
new file mode 100644
index 00000000..3c74be4a
--- /dev/null
+++ b/alacritty_terminal/src/tty/windows/blocking.rs
@@ -0,0 +1,276 @@
+//! Code for running a reader/writer on another thread while driving it through `polling`.
+
+use std::io::prelude::*;
+use std::marker::PhantomData;
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll, Wake, Waker};
+use std::{io, thread};
+
+use piper::{pipe, Reader, Writer};
+use polling::os::iocp::{CompletionPacket, PollerIocpExt};
+use polling::{Event, PollMode, Poller};
+
+use crate::thread::spawn_named;
+
+struct Registration {
+ interest: Mutex<Option<Interest>>,
+ end: PipeEnd,
+}
+
+#[derive(Copy, Clone)]
+enum PipeEnd {
+ Reader,
+ Writer,
+}
+
+struct Interest {
+ /// The event to send about completion.
+ event: Event,
+
+ /// The poller to send the event to.
+ poller: Arc<Poller>,
+
+ /// The mode that we are in.
+ mode: PollMode,
+}
+
+/// Poll a reader in another thread.
+pub struct UnblockedReader<R> {
+ /// The event to send about completion.
+ interest: Arc<Registration>,
+
+ /// The pipe that we are reading from.
+ pipe: Reader,
+
+ /// Is this the first time registering?
+ first_register: bool,
+
+ /// We logically own the reader, but we don't actually use it.
+ _reader: PhantomData<R>,
+}
+
+impl<R: Read + Send + 'static> UnblockedReader<R> {
+ /// Spawn a new unblocked reader.
+ pub fn new(mut source: R, pipe_capacity: usize) -> Self {
+ // Create a new pipe.
+ let (reader, mut writer) = pipe(pipe_capacity);
+ let interest = Arc::new(Registration {
+ interest: Mutex::<Option<Interest>>::new(None),
+ end: PipeEnd::Reader,
+ });
+
+ // Spawn the reader thread.
+ spawn_named("alacritty-tty-reader-thread", move || {
+ let waker = Waker::from(Arc::new(ThreadWaker(thread::current())));
+ let mut context = Context::from_waker(&waker);
+
+ loop {
+ // Read from the reader into the pipe.
+ match writer.poll_fill(&mut context, &mut source) {
+ Poll::Ready(Ok(0)) => {
+ // Either the pipe is closed or the reader is at its EOF.
+ // In any case, we are done.
+ return;
+ },
+
+ Poll::Ready(Ok(_)) => {
+ // Keep reading.
+ continue;
+ },
+
+ Poll::Ready(Err(e)) if e.kind() == io::ErrorKind::Interrupted => {
+ // We were interrupted; continue.
+ continue;
+ },
+
+ Poll::Ready(Err(e)) => {
+ log::error!("error writing to pipe: {}", e);
+ return;
+ },
+
+ Poll::Pending => {
+ // We are now waiting on the other end to advance. Park the
+ // thread until they do.
+ thread::park();
+ },
+ }
+ }
+ });
+
+ Self { interest, pipe: reader, first_register: true, _reader: PhantomData }
+ }
+
+ /// Register interest in the reader.
+ pub fn register(&mut self, poller: &Arc<Poller>, event: Event, mode: PollMode) {
+ let mut interest = self.interest.interest.lock().unwrap();
+ *interest = Some(Interest { event, poller: poller.clone(), mode });
+
+ // Send the event to start off with if we have any data.
+ if (!self.pipe.is_empty() && event.readable) || self.first_register {
+ self.first_register = false;
+ poller.post(CompletionPacket::new(event)).ok();
+ }
+ }
+
+ /// Deregister interest in the reader.
+ pub fn deregister(&self) {
+ let mut interest = self.interest.interest.lock().unwrap();
+ *interest = None;
+ }
+
+ /// Try to read from the reader.
+ pub fn try_read(&mut self, buf: &mut [u8]) -> usize {
+ let waker = Waker::from(self.interest.clone());
+
+ match self.pipe.poll_drain_bytes(&mut Context::from_waker(&waker), buf) {
+ Poll::Pending => 0,
+ Poll::Ready(n) => n,
+ }
+ }
+}
+
+impl<R: Read + Send + 'static> Read for UnblockedReader<R> {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ Ok(self.try_read(buf))
+ }
+}
+
+/// Poll a writer in another thread.
+pub struct UnblockedWriter<W> {
+ /// The interest to send about completion.
+ interest: Arc<Registration>,
+
+ /// The pipe that we are writing to.
+ pipe: Writer,
+
+ /// We logically own the writer, but we don't actually use it.
+ _reader: PhantomData<W>,
+}
+
+impl<W: Write + Send + 'static> UnblockedWriter<W> {
+ /// Spawn a new unblocked writer.
+ pub fn new(mut sink: W, pipe_capacity: usize) -> Self {
+ // Create a new pipe.
+ let (mut reader, writer) = pipe(pipe_capacity);
+ let interest = Arc::new(Registration {
+ interest: Mutex::<Option<Interest>>::new(None),
+ end: PipeEnd::Writer,
+ });
+
+ // Spawn the writer thread.
+ spawn_named("alacritty-tty-writer-thread", move || {
+ let waker = Waker::from(Arc::new(ThreadWaker(thread::current())));
+ let mut context = Context::from_waker(&waker);
+
+ loop {
+ // Write from the pipe into the writer.
+ match reader.poll_drain(&mut context, &mut sink) {
+ Poll::Ready(Ok(0)) => {
+ // Either the pipe is closed or the writer is full.
+ // In any case, we are done.
+ return;
+ },
+
+ Poll::Ready(Ok(_)) => {
+ // Keep writing.
+ continue;
+ },
+
+ Poll::Ready(Err(e)) if e.kind() == io::ErrorKind::Interrupted => {
+ // We were interrupted; continue.
+ continue;
+ },
+
+ Poll::Ready(Err(e)) => {
+ log::error!("error writing to pipe: {}", e);
+ return;
+ },
+
+ Poll::Pending => {
+ // We are now waiting on the other end to advance. Park the
+ // thread until they do.
+ thread::park();
+ },
+ }
+ }
+ });
+
+ Self { interest, pipe: writer, _reader: PhantomData }
+ }
+
+ /// Register interest in the writer.
+ pub fn register(&self, poller: &Arc<Poller>, event: Event, mode: PollMode) {
+ let mut interest = self.interest.interest.lock().unwrap();
+ *interest = Some(Interest { event, poller: poller.clone(), mode });
+
+ // Send the event to start off with if we have room for data.
+ if !self.pipe.is_full() && event.writable {
+ poller.post(CompletionPacket::new(event)).ok();
+ }
+ }
+
+ /// Deregister interest in the writer.
+ pub fn deregister(&self) {
+ let mut interest = self.interest.interest.lock().unwrap();
+ *interest = None;
+ }
+
+ /// Try to write to the writer.
+ pub fn try_write(&mut self, buf: &[u8]) -> usize {
+ let waker = Waker::from(self.interest.clone());
+
+ match self.pipe.poll_fill_bytes(&mut Context::from_waker(&waker), buf) {
+ Poll::Pending => 0,
+ Poll::Ready(n) => n,
+ }
+ }
+}
+
+impl<W: Write + Send + 'static> Write for UnblockedWriter<W> {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ Ok(self.try_write(buf))
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ // Nothing to flush.
+ Ok(())
+ }
+}
+
+struct ThreadWaker(thread::Thread);
+
+impl Wake for ThreadWaker {
+ fn wake(self: Arc<Self>) {
+ self.0.unpark();
+ }
+
+ fn wake_by_ref(self: &Arc<Self>) {
+ self.0.unpark();
+ }
+}
+
+impl Wake for Registration {
+ fn wake(self: Arc<Self>) {
+ self.wake_by_ref();
+ }
+
+ fn wake_by_ref(self: &Arc<Self>) {
+ let mut interest_lock = self.interest.lock().unwrap();
+ if let Some(interest) = interest_lock.as_ref() {
+ // Send the event to the poller.
+ let send_event = match self.end {
+ PipeEnd::Reader => interest.event.readable,
+ PipeEnd::Writer => interest.event.writable,
+ };
+
+ if send_event {
+ interest.poller.post(CompletionPacket::new(interest.event)).ok();
+
+ // Clear the event if we're in oneshot mode.
+ if matches!(interest.mode, PollMode::Oneshot | PollMode::EdgeOneshot) {
+ *interest_lock = None;
+ }
+ }
+ }
+ }
+}
diff --git a/alacritty_terminal/src/tty/windows/child.rs b/alacritty_terminal/src/tty/windows/child.rs
index 19c8a195..6bc9ed20 100644
--- a/alacritty_terminal/src/tty/windows/child.rs
+++ b/alacritty_terminal/src/tty/windows/child.rs
@@ -1,8 +1,10 @@
use std::ffi::c_void;
use std::io::Error;
use std::sync::atomic::{AtomicPtr, Ordering};
+use std::sync::{mpsc, Arc, Mutex};
-use mio_extras::channel::{channel, Receiver, Sender};
+use polling::os::iocp::{CompletionPacket, PollerIocpExt};
+use polling::{Event, Poller};
use windows_sys::Win32::Foundation::{BOOLEAN, HANDLE};
use windows_sys::Win32::System::Threading::{
@@ -12,27 +14,43 @@ use windows_sys::Win32::System::Threading::{
use crate::tty::ChildEvent;
+struct Interest {
+ poller: Arc<Poller>,
+ event: Event,
+}
+
+struct ChildExitSender {
+ sender: mpsc::Sender<ChildEvent>,
+ interest: Arc<Mutex<Option<Interest>>>,
+}
+
/// WinAPI callback to run when child process exits.
extern "system" fn child_exit_callback(ctx: *mut c_void, timed_out: BOOLEAN) {
if timed_out != 0 {
return;
}
- let event_tx: Box<_> = unsafe { Box::from_raw(ctx as *mut Sender<ChildEvent>) };
- let _ = event_tx.send(ChildEvent::Exited);
+ let event_tx: Box<_> = unsafe { Box::from_raw(ctx as *mut ChildExitSender) };
+ let _ = event_tx.sender.send(ChildEvent::Exited);
+ let interest = event_tx.interest.lock().unwrap();
+ if let Some(interest) = interest.as_ref() {
+ interest.poller.post(CompletionPacket::new(interest.event)).ok();
+ }
}
pub struct ChildExitWatcher {
wait_handle: AtomicPtr<c_void>,
- event_rx: Receiver<ChildEvent>,
+ event_rx: mpsc::Receiver<ChildEvent>,
+ interest: Arc<Mutex<Option<Interest>>>,
}
impl ChildExitWatcher {
pub fn new(child_handle: HANDLE) -> Result<ChildExitWatcher, Error> {
- let (event_tx, event_rx) = channel::<ChildEvent>();
+ let (event_tx, event_rx) = mpsc::channel();
let mut wait_handle: HANDLE = 0;
- let sender_ref = Box::new(event_tx);
+ let interest = Arc::new(Mutex::new(None));
+ let sender_ref = Box::new(ChildExitSender { sender: event_tx, interest: interest.clone() });
let success = unsafe {
RegisterWaitForSingleObject(
@@ -51,13 +69,22 @@ impl ChildExitWatcher {
Ok(ChildExitWatcher {
wait_handle: AtomicPtr::from(wait_handle as *mut c_void),
event_rx,
+ interest,
})
}
}
- pub fn event_rx(&self) -> &Receiver<ChildEvent> {
+ pub fn event_rx(&self) -> &mpsc::Receiver<ChildEvent> {
&self.event_rx
}
+
+ pub fn register(&self, poller: &Arc<Poller>, event: Event) {
+ *self.interest.lock().unwrap() = Some(Interest { poller: poller.clone(), event });
+ }
+
+ pub fn deregister(&self) {
+ *self.interest.lock().unwrap() = None;
+ }
}
impl Drop for ChildExitWatcher {
@@ -72,36 +99,28 @@ impl Drop for ChildExitWatcher {
mod tests {
use std::os::windows::io::AsRawHandle;
use std::process::Command;
+ use std::sync::Arc;
use std::time::Duration;
- use mio::{Events, Poll, PollOpt, Ready, Token};
-
+ use super::super::PTY_CHILD_EVENT_TOKEN;
use super::*;
#[test]
pub fn event_is_emitted_when_child_exits() {
const WAIT_TIMEOUT: Duration = Duration::from_millis(200);
+ let poller = Arc::new(Poller::new().unwrap());
+
let mut child = Command::new("cmd.exe").spawn().unwrap();
let child_exit_watcher = ChildExitWatcher::new(child.as_raw_handle() as HANDLE).unwrap();
-
- let mut events = Events::with_capacity(1);
- let poll = Poll::new().unwrap();
- let child_events_token = Token::from(0usize);
-
- poll.register(
- child_exit_watcher.event_rx(),
- child_events_token,
- Ready::readable(),
- PollOpt::oneshot(),
- )
- .unwrap();
+ child_exit_watcher.register(&poller, Event::readable(PTY_CHILD_EVENT_TOKEN));
child.kill().unwrap();
// Poll for the event or fail with timeout if nothing has been sent.
- poll.poll(&mut events, Some(WAIT_TIMEOUT)).unwrap();
- assert_eq!(events.iter().next().unwrap().token(), child_events_token);
+ let mut events = polling::Events::new();
+ poller.wait(&mut events, Some(WAIT_TIMEOUT)).unwrap();
+ assert_eq!(events.iter().next().unwrap().key, PTY_CHILD_EVENT_TOKEN);
// Verify that at least one `ChildEvent::Exited` was received.
assert_eq!(child_exit_watcher.event_rx().try_recv(), Ok(ChildEvent::Exited));
}
diff --git a/alacritty_terminal/src/tty/windows/conpty.rs b/alacritty_terminal/src/tty/windows/conpty.rs
index c9ed631e..12189371 100644
--- a/alacritty_terminal/src/tty/windows/conpty.rs
+++ b/alacritty_terminal/src/tty/windows/conpty.rs
@@ -3,8 +3,6 @@ use std::io::Error;
use std::os::windows::io::IntoRawHandle;
use std::{mem, ptr};
-use mio_anonymous_pipes::{EventedAnonRead, EventedAnonWrite};
-
use windows_sys::core::{HRESULT, PWSTR};
use windows_sys::Win32::Foundation::{HANDLE, S_OK};
use windows_sys::Win32::System::Console::{
@@ -21,9 +19,12 @@ use windows_sys::Win32::System::Threading::{
use crate::config::PtyConfig;
use crate::event::{OnResize, WindowSize};
+use crate::tty::windows::blocking::{UnblockedReader, UnblockedWriter};
use crate::tty::windows::child::ChildExitWatcher;
use crate::tty::windows::{cmdline, win32_string, Pty};
+const PIPE_CAPACITY: usize = crate::event_loop::READ_BUFFER_SIZE;
+
/// Load the pseudoconsole API from conpty.dll if possible, otherwise use the
/// standard Windows API.
///
@@ -220,8 +221,8 @@ pub fn new(config: &PtyConfig, window_size: WindowSize) -> Option<Pty> {
}
}
- let conin = EventedAnonWrite::new(conin);
- let conout = EventedAnonRead::new(conout);
+ let conin = UnblockedWriter::new(conin, PIPE_CAPACITY);
+ let conout = UnblockedReader::new(conout, PIPE_CAPACITY);
let child_watcher = ChildExitWatcher::new(proc_info.hProcess).unwrap();
let conpty = Conpty { handle: pty_handle as HPCON, api };
diff --git a/alacritty_terminal/src/tty/windows/mod.rs b/alacritty_terminal/src/tty/windows/mod.rs
index 57925f4c..080f6e83 100644
--- a/alacritty_terminal/src/tty/windows/mod.rs
+++ b/alacritty_terminal/src/tty/windows/mod.rs
@@ -3,17 +3,27 @@ use std::io::{self, Error, ErrorKind, Result};
use std::iter::once;
use std::os::windows::ffi::OsStrExt;
use std::sync::mpsc::TryRecvError;
+use std::sync::Arc;
use crate::config::{Program, PtyConfig};
use crate::event::{OnResize, WindowSize};
use crate::tty::windows::child::ChildExitWatcher;
use crate::tty::{ChildEvent, EventedPty, EventedReadWrite};
+mod blocking;
mod child;
mod conpty;
+use blocking::{UnblockedReader, UnblockedWriter};
use conpty::Conpty as Backend;
-use mio_anonymous_pipes::{EventedAnonRead as ReadPipe, EventedAnonWrite as WritePipe};
+use miow::pipe::{AnonRead, AnonWrite};
+use polling::{Event, Poller};
+
+pub const PTY_CHILD_EVENT_TOKEN: usize = 1;
+pub const PTY_READ_WRITE_TOKEN: usize = 2;
+
+type ReadPipe = UnblockedReader<AnonRead>;
+type WritePipe = UnblockedWriter<AnonWrite>;
pub struct Pty {
// XXX: Backend is required to be the first field, to ensure correct drop order. Dropping
@@ -21,9 +31,6 @@ pub struct Pty {
backend: Backend,
conout: ReadPipe,
conin: WritePipe,
- read_token: mio::Token,
- write_token: mio::Token,
- child_event_token: mio::Token,
child_watcher: ChildExitWatcher,
}
@@ -39,51 +46,29 @@ impl Pty {
conin: impl Into<WritePipe>,
child_watcher: ChildExitWatcher,
) -> Self {
- Self {
- backend: backend.into(),
- conout: conout.into(),
- conin: conin.into(),
- read_token: 0.into(),
- write_token: 0.into(),
- child_event_token: 0.into(),
- child_watcher,
- }
+ Self { backend: backend.into(), conout: conout.into(), conin: conin.into(), child_watcher }
}
}
+fn with_key(mut event: Event, key: usize) -> Event {
+ event.key = key;
+ event
+}
+
impl EventedReadWrite for Pty {
type Reader = ReadPipe;
type Writer = WritePipe;
#[inline]
- fn register(
+ unsafe fn register(
&mut self,
- poll: &mio::Poll,
- token: &mut dyn Iterator<Item = mio::Token>,
- interest: mio::Ready,
- poll_opts: mio::PollOpt,
+ poll: &Arc<Poller>,
+ interest: polling::Event,
+ poll_opts: polling::PollMode,
) -> io::Result<()> {
- self.read_token = token.next().unwrap();
- self.write_token = token.next().unwrap();
-
- if interest.is_readable() {
- poll.register(&self.conout, self.read_token, mio::Ready::readable(), poll_opts)?
- } else {
- poll.register(&self.conout, self.read_token, mio::Ready::empty(), poll_opts)?
- }
- if interest.is_writable() {
- poll.register(&self.conin, self.write_token, mio::Ready::writable(), poll_opts)?
- } else {
- poll.register(&self.conin, self.write_token, mio::Ready::empty(), poll_opts)?
- }
-
- self.child_event_token = token.next().unwrap();
- poll.register(
- self.child_watcher.event_rx(),
- self.child_event_token,
- mio::Ready::readable(),
- poll_opts,
- )?;
+ self.conin.register(poll, with_key(interest, PTY_READ_WRITE_TOKEN), poll_opts);
+ self.conout.register(poll, with_key(interest, PTY_READ_WRITE_TOKEN), poll_opts);
+ self.child_watcher.register(poll, with_key(interest, PTY_CHILD_EVENT_TOKEN));
Ok(())
}
@@ -91,36 +76,23 @@ impl EventedReadWrite for Pty {
#[inline]
fn reregister(
&mut self,
- poll: &mio::Poll,
- interest: mio::Ready,
- poll_opts: mio::PollOpt,
+ poll: &Arc<Poller>,
+ interest: polling::Event,
+ poll_opts: polling::PollMode,
) -> io::Result<()> {
- if interest.is_readable() {
- poll.reregister(&self.conout, self.read_token, mio::Ready::readable(), poll_opts)?;
- } else {
- poll.reregister(&self.conout, self.read_token, mio::Ready::empty(), poll_opts)?;
- }
- if interest.is_writable() {
- poll.reregister(&self.conin, self.write_token, mio::Ready::writable(), poll_opts)?;
- } else {
- poll.reregister(&self.conin, self.write_token, mio::Ready::empty(), poll_opts)?;
- }
-
- poll.reregister(
- self.child_watcher.event_rx(),
- self.child_event_token,
- mio::Ready::readable(),
- poll_opts,
- )?;
+ self.conin.register(poll, with_key(interest, PTY_READ_WRITE_TOKEN), poll_opts);
+ self.conout.register(poll, with_key(interest, PTY_READ_WRITE_TOKEN), poll_opts);
+ self.child_watcher.register(poll, with_key(interest, PTY_CHILD_EVENT_TOKEN));
Ok(())
}
#[inline]
- fn deregister(&mut self, poll: &mio::Poll) -> io::Result<()> {
- poll.deregister(&self.conout)?;
- poll.deregister(&self.conin)?;
- poll.deregister(self.child_watcher.event_rx())?;
+ fn deregister(&mut self, _poll: &Arc<Poller>) -> io::Result<()> {
+ self.conin.deregister();
+ self.conout.deregister();
+ self.child_watcher.deregister();
+
Ok(())
}
@@ -130,26 +102,12 @@ impl EventedReadWrite for Pty {
}
#[inline]
- fn read_token(&self) -> mio::Token {
- self.read_token
- }
-
- #[inline]
fn writer(&mut self) -> &mut Self::Writer {
&mut self.conin
}
-
- #[inline]
- fn write_token(&self) -> mio::Token {
- self.write_token
- }
}
impl EventedPty for Pty {
- fn child_event_token(&self) -> mio::Token {
- self.child_event_token
- }
-
fn next_child_event(&mut self) -> Option<ChildEvent> {
match self.child_watcher.event_rx().try_recv() {
Ok(ev) => Some(ev),