1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
use std::ffi::c_void;
use std::io::Error;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use polling::os::iocp::{CompletionPacket, PollerIocpExt};
use polling::{Event, Poller};
use windows_sys::Win32::Foundation::{BOOLEAN, HANDLE};
use windows_sys::Win32::System::Threading::{
RegisterWaitForSingleObject, UnregisterWait, INFINITE, WT_EXECUTEINWAITTHREAD,
WT_EXECUTEONLYONCE,
};
use crate::tty::ChildEvent;
struct Interest {
poller: Arc<Poller>,
event: Event,
}
struct ChildExitSender {
sender: mpsc::Sender<ChildEvent>,
interest: Arc<Mutex<Option<Interest>>>,
}
/// WinAPI callback to run when child process exits.
extern "system" fn child_exit_callback(ctx: *mut c_void, timed_out: BOOLEAN) {
if timed_out != 0 {
return;
}
let event_tx: Box<_> = unsafe { Box::from_raw(ctx as *mut ChildExitSender) };
let _ = event_tx.sender.send(ChildEvent::Exited);
let interest = event_tx.interest.lock().unwrap();
if let Some(interest) = interest.as_ref() {
interest.poller.post(CompletionPacket::new(interest.event)).ok();
}
}
pub struct ChildExitWatcher {
wait_handle: AtomicPtr<c_void>,
event_rx: mpsc::Receiver<ChildEvent>,
interest: Arc<Mutex<Option<Interest>>>,
}
impl ChildExitWatcher {
pub fn new(child_handle: HANDLE) -> Result<ChildExitWatcher, Error> {
let (event_tx, event_rx) = mpsc::channel();
let mut wait_handle: HANDLE = 0;
let interest = Arc::new(Mutex::new(None));
let sender_ref = Box::new(ChildExitSender { sender: event_tx, interest: interest.clone() });
let success = unsafe {
RegisterWaitForSingleObject(
&mut wait_handle,
child_handle,
Some(child_exit_callback),
Box::into_raw(sender_ref).cast(),
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};
if success == 0 {
Err(Error::last_os_error())
} else {
Ok(ChildExitWatcher {
wait_handle: AtomicPtr::from(wait_handle as *mut c_void),
event_rx,
interest,
})
}
}
pub fn event_rx(&self) -> &mpsc::Receiver<ChildEvent> {
&self.event_rx
}
pub fn register(&self, poller: &Arc<Poller>, event: Event) {
*self.interest.lock().unwrap() = Some(Interest { poller: poller.clone(), event });
}
pub fn deregister(&self) {
*self.interest.lock().unwrap() = None;
}
}
impl Drop for ChildExitWatcher {
fn drop(&mut self) {
unsafe {
UnregisterWait(self.wait_handle.load(Ordering::Relaxed) as HANDLE);
}
}
}
#[cfg(test)]
mod tests {
use std::os::windows::io::AsRawHandle;
use std::process::Command;
use std::sync::Arc;
use std::time::Duration;
use super::super::PTY_CHILD_EVENT_TOKEN;
use super::*;
#[test]
pub fn event_is_emitted_when_child_exits() {
const WAIT_TIMEOUT: Duration = Duration::from_millis(200);
let poller = Arc::new(Poller::new().unwrap());
let mut child = Command::new("cmd.exe").spawn().unwrap();
let child_exit_watcher = ChildExitWatcher::new(child.as_raw_handle() as HANDLE).unwrap();
child_exit_watcher.register(&poller, Event::readable(PTY_CHILD_EVENT_TOKEN));
child.kill().unwrap();
// Poll for the event or fail with timeout if nothing has been sent.
let mut events = polling::Events::new();
poller.wait(&mut events, Some(WAIT_TIMEOUT)).unwrap();
assert_eq!(events.iter().next().unwrap().key, PTY_CHILD_EVENT_TOKEN);
// Verify that at least one `ChildEvent::Exited` was received.
assert_eq!(child_exit_watcher.event_rx().try_recv(), Ok(ChildEvent::Exited));
}
}
|