diff options
Diffstat (limited to 'alacritty_terminal/src/tty/windows/blocking.rs')
-rw-r--r-- | alacritty_terminal/src/tty/windows/blocking.rs | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/alacritty_terminal/src/tty/windows/blocking.rs b/alacritty_terminal/src/tty/windows/blocking.rs new file mode 100644 index 00000000..3c74be4a --- /dev/null +++ b/alacritty_terminal/src/tty/windows/blocking.rs @@ -0,0 +1,276 @@ +//! Code for running a reader/writer on another thread while driving it through `polling`. + +use std::io::prelude::*; +use std::marker::PhantomData; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Wake, Waker}; +use std::{io, thread}; + +use piper::{pipe, Reader, Writer}; +use polling::os::iocp::{CompletionPacket, PollerIocpExt}; +use polling::{Event, PollMode, Poller}; + +use crate::thread::spawn_named; + +struct Registration { + interest: Mutex<Option<Interest>>, + end: PipeEnd, +} + +#[derive(Copy, Clone)] +enum PipeEnd { + Reader, + Writer, +} + +struct Interest { + /// The event to send about completion. + event: Event, + + /// The poller to send the event to. + poller: Arc<Poller>, + + /// The mode that we are in. + mode: PollMode, +} + +/// Poll a reader in another thread. +pub struct UnblockedReader<R> { + /// The event to send about completion. + interest: Arc<Registration>, + + /// The pipe that we are reading from. + pipe: Reader, + + /// Is this the first time registering? + first_register: bool, + + /// We logically own the reader, but we don't actually use it. + _reader: PhantomData<R>, +} + +impl<R: Read + Send + 'static> UnblockedReader<R> { + /// Spawn a new unblocked reader. + pub fn new(mut source: R, pipe_capacity: usize) -> Self { + // Create a new pipe. + let (reader, mut writer) = pipe(pipe_capacity); + let interest = Arc::new(Registration { + interest: Mutex::<Option<Interest>>::new(None), + end: PipeEnd::Reader, + }); + + // Spawn the reader thread. + spawn_named("alacritty-tty-reader-thread", move || { + let waker = Waker::from(Arc::new(ThreadWaker(thread::current()))); + let mut context = Context::from_waker(&waker); + + loop { + // Read from the reader into the pipe. + match writer.poll_fill(&mut context, &mut source) { + Poll::Ready(Ok(0)) => { + // Either the pipe is closed or the reader is at its EOF. + // In any case, we are done. + return; + }, + + Poll::Ready(Ok(_)) => { + // Keep reading. + continue; + }, + + Poll::Ready(Err(e)) if e.kind() == io::ErrorKind::Interrupted => { + // We were interrupted; continue. + continue; + }, + + Poll::Ready(Err(e)) => { + log::error!("error writing to pipe: {}", e); + return; + }, + + Poll::Pending => { + // We are now waiting on the other end to advance. Park the + // thread until they do. + thread::park(); + }, + } + } + }); + + Self { interest, pipe: reader, first_register: true, _reader: PhantomData } + } + + /// Register interest in the reader. + pub fn register(&mut self, poller: &Arc<Poller>, event: Event, mode: PollMode) { + let mut interest = self.interest.interest.lock().unwrap(); + *interest = Some(Interest { event, poller: poller.clone(), mode }); + + // Send the event to start off with if we have any data. + if (!self.pipe.is_empty() && event.readable) || self.first_register { + self.first_register = false; + poller.post(CompletionPacket::new(event)).ok(); + } + } + + /// Deregister interest in the reader. + pub fn deregister(&self) { + let mut interest = self.interest.interest.lock().unwrap(); + *interest = None; + } + + /// Try to read from the reader. + pub fn try_read(&mut self, buf: &mut [u8]) -> usize { + let waker = Waker::from(self.interest.clone()); + + match self.pipe.poll_drain_bytes(&mut Context::from_waker(&waker), buf) { + Poll::Pending => 0, + Poll::Ready(n) => n, + } + } +} + +impl<R: Read + Send + 'static> Read for UnblockedReader<R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + Ok(self.try_read(buf)) + } +} + +/// Poll a writer in another thread. +pub struct UnblockedWriter<W> { + /// The interest to send about completion. + interest: Arc<Registration>, + + /// The pipe that we are writing to. + pipe: Writer, + + /// We logically own the writer, but we don't actually use it. + _reader: PhantomData<W>, +} + +impl<W: Write + Send + 'static> UnblockedWriter<W> { + /// Spawn a new unblocked writer. + pub fn new(mut sink: W, pipe_capacity: usize) -> Self { + // Create a new pipe. + let (mut reader, writer) = pipe(pipe_capacity); + let interest = Arc::new(Registration { + interest: Mutex::<Option<Interest>>::new(None), + end: PipeEnd::Writer, + }); + + // Spawn the writer thread. + spawn_named("alacritty-tty-writer-thread", move || { + let waker = Waker::from(Arc::new(ThreadWaker(thread::current()))); + let mut context = Context::from_waker(&waker); + + loop { + // Write from the pipe into the writer. + match reader.poll_drain(&mut context, &mut sink) { + Poll::Ready(Ok(0)) => { + // Either the pipe is closed or the writer is full. + // In any case, we are done. + return; + }, + + Poll::Ready(Ok(_)) => { + // Keep writing. + continue; + }, + + Poll::Ready(Err(e)) if e.kind() == io::ErrorKind::Interrupted => { + // We were interrupted; continue. + continue; + }, + + Poll::Ready(Err(e)) => { + log::error!("error writing to pipe: {}", e); + return; + }, + + Poll::Pending => { + // We are now waiting on the other end to advance. Park the + // thread until they do. + thread::park(); + }, + } + } + }); + + Self { interest, pipe: writer, _reader: PhantomData } + } + + /// Register interest in the writer. + pub fn register(&self, poller: &Arc<Poller>, event: Event, mode: PollMode) { + let mut interest = self.interest.interest.lock().unwrap(); + *interest = Some(Interest { event, poller: poller.clone(), mode }); + + // Send the event to start off with if we have room for data. + if !self.pipe.is_full() && event.writable { + poller.post(CompletionPacket::new(event)).ok(); + } + } + + /// Deregister interest in the writer. + pub fn deregister(&self) { + let mut interest = self.interest.interest.lock().unwrap(); + *interest = None; + } + + /// Try to write to the writer. + pub fn try_write(&mut self, buf: &[u8]) -> usize { + let waker = Waker::from(self.interest.clone()); + + match self.pipe.poll_fill_bytes(&mut Context::from_waker(&waker), buf) { + Poll::Pending => 0, + Poll::Ready(n) => n, + } + } +} + +impl<W: Write + Send + 'static> Write for UnblockedWriter<W> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + Ok(self.try_write(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + // Nothing to flush. + Ok(()) + } +} + +struct ThreadWaker(thread::Thread); + +impl Wake for ThreadWaker { + fn wake(self: Arc<Self>) { + self.0.unpark(); + } + + fn wake_by_ref(self: &Arc<Self>) { + self.0.unpark(); + } +} + +impl Wake for Registration { + fn wake(self: Arc<Self>) { + self.wake_by_ref(); + } + + fn wake_by_ref(self: &Arc<Self>) { + let mut interest_lock = self.interest.lock().unwrap(); + if let Some(interest) = interest_lock.as_ref() { + // Send the event to the poller. + let send_event = match self.end { + PipeEnd::Reader => interest.event.readable, + PipeEnd::Writer => interest.event.writable, + }; + + if send_event { + interest.poller.post(CompletionPacket::new(interest.event)).ok(); + + // Clear the event if we're in oneshot mode. + if matches!(interest.mode, PollMode::Oneshot | PollMode::EdgeOneshot) { + *interest_lock = None; + } + } + } + } +} |