aboutsummaryrefslogtreecommitdiff
path: root/src/event_loop.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/event_loop.rs')
-rw-r--r--src/event_loop.rs319
1 files changed, 193 insertions, 126 deletions
diff --git a/src/event_loop.rs b/src/event_loop.rs
index 51eeccd1..9584e416 100644
--- a/src/event_loop.rs
+++ b/src/event_loop.rs
@@ -1,5 +1,6 @@
//! The main event loop which performs I/O on the pseudoterminal
use std::borrow::Cow;
+use std::collections::VecDeque;
use std::io::{self, ErrorKind};
use std::os::unix::io::AsRawFd;
use std::sync::Arc;
@@ -14,6 +15,17 @@ use sync::FairMutex;
use super::Flag;
+/// Messages that may be sent to the `EventLoop`
+#[derive(Debug)]
+pub enum Msg {
+ /// Data that should be written to the pty
+ Input(Cow<'static, [u8]>),
+}
+
+/// The main event!.. loop.
+///
+/// Handles all the pty I/O and runs the pty parser which updates terminal
+/// state.
pub struct EventLoop<Io> {
poll: mio::Poll,
pty: Io,
@@ -24,18 +36,95 @@ pub struct EventLoop<Io> {
signal_flag: Flag,
}
+/// Helper type which tracks how much of a buffer has been written.
+struct Writing {
+ source: Cow<'static, [u8]>,
+ written: usize,
+}
-#[derive(Debug)]
-pub enum Msg {
- Input(Cow<'static, [u8]>),
+/// All of the mutable state needed to run the event loop
+///
+/// Contains list of items to write, current write state, etc. Anything that
+/// would otherwise be mutated on the `EventLoop` goes here.
+pub struct State {
+ write_list: VecDeque<Cow<'static, [u8]>>,
+ writing: Option<Writing>,
+ parser: ansi::Processor,
+}
+
+impl Default for State {
+ fn default() -> State {
+ State {
+ write_list: VecDeque::new(),
+ parser: ansi::Processor::new(),
+ writing: None,
+ }
+ }
}
+impl State {
+ #[inline]
+ fn ensure_next(&mut self) {
+ if self.writing.is_none() {
+ self.goto_next();
+ }
+ }
+
+ #[inline]
+ fn goto_next(&mut self) {
+ self.writing = self.write_list
+ .pop_front()
+ .map(|c| Writing::new(c));
+ }
+
+ #[inline]
+ fn take_current(&mut self) -> Option<Writing> {
+ self.writing.take()
+ }
+
+ #[inline]
+ fn needs_write(&self) -> bool {
+ self.writing.is_some() || !self.write_list.is_empty()
+ }
+
+ #[inline]
+ fn set_current(&mut self, new: Option<Writing>) {
+ self.writing = new;
+ }
+}
+
+impl Writing {
+ #[inline]
+ fn new(c: Cow<'static, [u8]>) -> Writing {
+ Writing { source: c, written: 0 }
+ }
+
+ #[inline]
+ fn advance(&mut self, n: usize) {
+ self.written += n;
+ }
+
+ #[inline]
+ fn remaining_bytes(&self) -> &[u8] {
+ &self.source[self.written..]
+ }
+
+ #[inline]
+ fn finished(&self) -> bool {
+ self.written >= self.source.len()
+ }
+}
+
+/// mio::Token for the event loop channel
const CHANNEL: mio::Token = mio::Token(0);
+
+/// mio::Token for the pty file descriptor
const PTY: mio::Token = mio::Token(1);
impl<Io> EventLoop<Io>
where Io: io::Read + io::Write + Send + AsRawFd + 'static
{
+ /// Create a new event loop
pub fn new(
terminal: Arc<FairMutex<Term>>,
proxy: ::glutin::WindowProxy,
@@ -58,166 +147,141 @@ impl<Io> EventLoop<Io>
self.tx.clone()
}
- pub fn spawn(self) -> thread::JoinHandle<()> {
+ #[inline]
+ fn channel_event(&mut self, state: &mut State) {
+ while let Ok(msg) = self.rx.try_recv() {
+ match msg {
+ Msg::Input(input) => {
+ state.write_list.push_back(input);
+ }
+ }
+ }
- struct Writing {
- source: Cow<'static, [u8]>,
- written: usize,
+ self.poll.reregister(
+ &self.rx, CHANNEL,
+ Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot()
+ ).expect("reregister channel");
+
+ if state.needs_write() {
+ self.poll.reregister(
+ &EventedFd(&self.pty.as_raw_fd()),
+ PTY,
+ Ready::readable() | Ready::writable(),
+ PollOpt::edge() | PollOpt::oneshot()
+ ).expect("reregister fd after channel recv");
}
+ }
- impl Writing {
- #[inline]
- fn new(c: Cow<'static, [u8]>) -> Writing {
- Writing { source: c, written: 0 }
- }
+ #[inline]
+ fn pty_read(&mut self, state: &mut State, buf: &mut [u8]) {
+ loop {
+ match self.pty.read(&mut buf[..]) {
+ Ok(0) => break,
+ Ok(got) => {
+ let mut terminal = self.terminal.lock();
+ for byte in &buf[..got] {
+ state.parser.advance(&mut *terminal, *byte);
+ }
- #[inline]
- fn advance(&mut self, n: usize) {
- self.written += n;
- }
+ terminal.dirty = true;
- #[inline]
- fn remaining_bytes(&self) -> &[u8] {
- &self.source[self.written..]
+ // Only wake up the event loop if it hasn't already been
+ // signaled. This is a really important optimization because
+ // waking up the event loop redundantly burns *a lot* of
+ // cycles.
+ if !self.signal_flag.get() {
+ self.proxy.wakeup_event_loop();
+ self.signal_flag.set(true);
+ }
+ },
+ Err(err) => {
+ match err.kind() {
+ ErrorKind::WouldBlock => break,
+ _ => panic!("unexpected read err: {:?}", err),
+ }
+ }
}
+ }
+ }
+
+ #[inline]
+ fn pty_write(&mut self, state: &mut State) {
+ state.ensure_next();
+
+ 'write_many: while let Some(mut current) = state.take_current() {
+ 'write_one: loop {
+ match self.pty.write(current.remaining_bytes()) {
+ Ok(0) => {
+ state.set_current(Some(current));
+ break 'write_many;
+ },
+ Ok(n) => {
+ current.advance(n);
+ if current.finished() {
+ state.goto_next();
+ break 'write_one;
+ }
+ },
+ Err(err) => {
+ state.set_current(Some(current));
+ match err.kind() {
+ ErrorKind::WouldBlock => break 'write_many,
+ // TODO
+ _ => panic!("unexpected err: {:?}", err),
+ }
+ }
+ }
- #[inline]
- fn finished(&self) -> bool {
- self.written >= self.source.len()
}
}
+ }
+ pub fn spawn(mut self, state: Option<State>) -> thread::JoinHandle<(EventLoop<Io>, State)> {
thread::spawn_named("pty reader", move || {
-
- let EventLoop { poll, mut pty, rx, terminal, proxy, signal_flag, .. } = self;
-
-
+ let mut state = state.unwrap_or_else(Default::default);
let mut buf = [0u8; 4096];
- let mut pty_parser = ansi::Processor::new();
- let fd = pty.as_raw_fd();
+
+ let fd = self.pty.as_raw_fd();
let fd = EventedFd(&fd);
- poll.register(&rx, CHANNEL, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
- .unwrap();
- poll.register(&fd, PTY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
- .unwrap();
+ let poll_opts = PollOpt::edge() | PollOpt::oneshot();
+
+ self.poll.register(&self.rx, CHANNEL, Ready::readable(), poll_opts).unwrap();
+ self.poll.register(&fd, PTY, Ready::readable(), poll_opts).unwrap();
let mut events = Events::with_capacity(1024);
- let mut write_list = ::std::collections::VecDeque::new();
- let mut writing = None;
'event_loop: loop {
- poll.poll(&mut events, None).expect("poll ok");
+ self.poll.poll(&mut events, None).expect("poll ok");
for event in events.iter() {
match event.token() {
- CHANNEL => {
- while let Ok(msg) = rx.try_recv() {
- match msg {
- Msg::Input(input) => {
- write_list.push_back(input);
- }
- }
- }
-
- poll.reregister(
- &rx, CHANNEL,
- Ready::readable(),
- PollOpt::edge() | PollOpt::oneshot()
- ).expect("reregister channel");
-
- if writing.is_some() || !write_list.is_empty() {
- poll.reregister(
- &fd,
- PTY,
- Ready::readable() | Ready::writable(),
- PollOpt::edge() | PollOpt::oneshot()
- ).expect("reregister fd after channel recv");
- }
- },
+ CHANNEL => self.channel_event(&mut state),
PTY => {
let kind = event.kind();
if kind.is_readable() {
- loop {
- match pty.read(&mut buf[..]) {
- Ok(0) => break,
- Ok(got) => {
- let mut terminal = terminal.lock();
- for byte in &buf[..got] {
- pty_parser.advance(&mut *terminal, *byte);
- }
-
- terminal.dirty = true;
-
- // Only wake up the event loop if it hasn't already been
- // signaled. This is a really important optimization
- // because waking up the event loop redundantly burns *a
- // lot* of cycles.
- if !signal_flag.get() {
- proxy.wakeup_event_loop();
- signal_flag.set(true);
- }
- },
- Err(err) => {
- match err.kind() {
- ErrorKind::WouldBlock => break,
- _ => panic!("unexpected read err: {:?}", err),
- }
- }
- }
- }
+ self.pty_read(&mut state, &mut buf);
}
if kind.is_writable() {
- if writing.is_none() {
- writing = write_list
- .pop_front()
- .map(|c| Writing::new(c));
- }
-
- 'write_list_loop: while let Some(mut write_now) = writing.take() {
- loop {
- match pty.write(write_now.remaining_bytes()) {
- Ok(0) => {
- writing = Some(write_now);
- break 'write_list_loop;
- },
- Ok(n) => {
- write_now.advance(n);
- if write_now.finished() {
- writing = write_list
- .pop_front()
- .map(|next| Writing::new(next));
-
- break;
- } else {
- }
- },
- Err(err) => {
- writing = Some(write_now);
- match err.kind() {
- ErrorKind::WouldBlock => break 'write_list_loop,
- // TODO
- _ => panic!("unexpected err: {:?}", err),
- }
- }
- }
-
- }
- }
+ self.pty_write(&mut state);
}
if kind.is_hup() {
break 'event_loop;
}
+ // Figure out pty interest
let mut interest = Ready::readable();
- if writing.is_some() || !write_list.is_empty() {
+ if state.needs_write() {
interest.insert(Ready::writable());
}
- poll.reregister(&fd, PTY, interest, PollOpt::edge() | PollOpt::oneshot())
+ // Reregister pty
+ self.poll
+ .reregister(&fd, PTY, interest, poll_opts)
.expect("register fd after read/write");
},
_ => (),
@@ -225,7 +289,10 @@ impl<Io> EventLoop<Io>
}
}
- println!("pty reader stopped");
+ self.poll.deregister(&self.rx).expect("deregister channel");
+ self.poll.deregister(&fd).expect("deregister pty");
+
+ (self, state)
})
}
}