//! Code for running a reader/writer on another thread while driving it through `polling`. use std::io::prelude::*; use std::marker::PhantomData; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Wake, Waker}; use std::{io, thread}; use piper::{pipe, Reader, Writer}; use polling::os::iocp::{CompletionPacket, PollerIocpExt}; use polling::{Event, PollMode, Poller}; use crate::thread::spawn_named; struct Registration { interest: Mutex>, end: PipeEnd, } #[derive(Copy, Clone)] enum PipeEnd { Reader, Writer, } struct Interest { /// The event to send about completion. event: Event, /// The poller to send the event to. poller: Arc, /// The mode that we are in. mode: PollMode, } /// Poll a reader in another thread. pub struct UnblockedReader { /// The event to send about completion. interest: Arc, /// The pipe that we are reading from. pipe: Reader, /// Is this the first time registering? first_register: bool, /// We logically own the reader, but we don't actually use it. _reader: PhantomData, } impl UnblockedReader { /// Spawn a new unblocked reader. pub fn new(mut source: R, pipe_capacity: usize) -> Self { // Create a new pipe. let (reader, mut writer) = pipe(pipe_capacity); let interest = Arc::new(Registration { interest: Mutex::>::new(None), end: PipeEnd::Reader, }); // Spawn the reader thread. spawn_named("alacritty-tty-reader-thread", move || { let waker = Waker::from(Arc::new(ThreadWaker(thread::current()))); let mut context = Context::from_waker(&waker); loop { // Read from the reader into the pipe. match writer.poll_fill(&mut context, &mut source) { Poll::Ready(Ok(0)) => { // Either the pipe is closed or the reader is at its EOF. // In any case, we are done. return; }, Poll::Ready(Ok(_)) => { // Keep reading. continue; }, Poll::Ready(Err(e)) if e.kind() == io::ErrorKind::Interrupted => { // We were interrupted; continue. continue; }, Poll::Ready(Err(e)) => { log::error!("error writing to pipe: {}", e); return; }, Poll::Pending => { // We are now waiting on the other end to advance. Park the // thread until they do. thread::park(); }, } } }); Self { interest, pipe: reader, first_register: true, _reader: PhantomData } } /// Register interest in the reader. pub fn register(&mut self, poller: &Arc, event: Event, mode: PollMode) { let mut interest = self.interest.interest.lock().unwrap(); *interest = Some(Interest { event, poller: poller.clone(), mode }); // Send the event to start off with if we have any data. if (!self.pipe.is_empty() && event.readable) || self.first_register { self.first_register = false; poller.post(CompletionPacket::new(event)).ok(); } } /// Deregister interest in the reader. pub fn deregister(&self) { let mut interest = self.interest.interest.lock().unwrap(); *interest = None; } /// Try to read from the reader. pub fn try_read(&mut self, buf: &mut [u8]) -> usize { let waker = Waker::from(self.interest.clone()); match self.pipe.poll_drain_bytes(&mut Context::from_waker(&waker), buf) { Poll::Pending => 0, Poll::Ready(n) => n, } } } impl Read for UnblockedReader { fn read(&mut self, buf: &mut [u8]) -> io::Result { Ok(self.try_read(buf)) } } /// Poll a writer in another thread. pub struct UnblockedWriter { /// The interest to send about completion. interest: Arc, /// The pipe that we are writing to. pipe: Writer, /// We logically own the writer, but we don't actually use it. _reader: PhantomData, } impl UnblockedWriter { /// Spawn a new unblocked writer. pub fn new(mut sink: W, pipe_capacity: usize) -> Self { // Create a new pipe. let (mut reader, writer) = pipe(pipe_capacity); let interest = Arc::new(Registration { interest: Mutex::>::new(None), end: PipeEnd::Writer, }); // Spawn the writer thread. spawn_named("alacritty-tty-writer-thread", move || { let waker = Waker::from(Arc::new(ThreadWaker(thread::current()))); let mut context = Context::from_waker(&waker); loop { // Write from the pipe into the writer. match reader.poll_drain(&mut context, &mut sink) { Poll::Ready(Ok(0)) => { // Either the pipe is closed or the writer is full. // In any case, we are done. return; }, Poll::Ready(Ok(_)) => { // Keep writing. continue; }, Poll::Ready(Err(e)) if e.kind() == io::ErrorKind::Interrupted => { // We were interrupted; continue. continue; }, Poll::Ready(Err(e)) => { log::error!("error writing to pipe: {}", e); return; }, Poll::Pending => { // We are now waiting on the other end to advance. Park the // thread until they do. thread::park(); }, } } }); Self { interest, pipe: writer, _reader: PhantomData } } /// Register interest in the writer. pub fn register(&self, poller: &Arc, event: Event, mode: PollMode) { let mut interest = self.interest.interest.lock().unwrap(); *interest = Some(Interest { event, poller: poller.clone(), mode }); // Send the event to start off with if we have room for data. if !self.pipe.is_full() && event.writable { poller.post(CompletionPacket::new(event)).ok(); } } /// Deregister interest in the writer. pub fn deregister(&self) { let mut interest = self.interest.interest.lock().unwrap(); *interest = None; } /// Try to write to the writer. pub fn try_write(&mut self, buf: &[u8]) -> usize { let waker = Waker::from(self.interest.clone()); match self.pipe.poll_fill_bytes(&mut Context::from_waker(&waker), buf) { Poll::Pending => 0, Poll::Ready(n) => n, } } } impl Write for UnblockedWriter { fn write(&mut self, buf: &[u8]) -> io::Result { Ok(self.try_write(buf)) } fn flush(&mut self) -> io::Result<()> { // Nothing to flush. Ok(()) } } struct ThreadWaker(thread::Thread); impl Wake for ThreadWaker { fn wake(self: Arc) { self.0.unpark(); } fn wake_by_ref(self: &Arc) { self.0.unpark(); } } impl Wake for Registration { fn wake(self: Arc) { self.wake_by_ref(); } fn wake_by_ref(self: &Arc) { let mut interest_lock = self.interest.lock().unwrap(); if let Some(interest) = interest_lock.as_ref() { // Send the event to the poller. let send_event = match self.end { PipeEnd::Reader => interest.event.readable, PipeEnd::Writer => interest.event.writable, }; if send_event { interest.poller.post(CompletionPacket::new(interest.event)).ok(); // Clear the event if we're in oneshot mode. if matches!(interest.mode, PollMode::Oneshot | PollMode::EdgeOneshot) { *interest_lock = None; } } } } }