mirror of
https://github.com/denoland/deno.git
synced 2024-11-29 16:30:56 -05:00
7c2e7c6608
Provides a concrete state type that can be dynamically added. This is necessary for op crates. * renames BasicState to OpState * async ops take `Rc<RefCell<OpState>>` * sync ops take `&mut OpState` * removes `OpRegistry`, `OpRouter` traits * `get_error_class_fn` moved to OpState * ResourceTable moved to OpState
478 lines
14 KiB
Rust
478 lines
14 KiB
Rust
use super::dispatch_minimal::minimal_op;
|
|
use super::dispatch_minimal::MinimalOp;
|
|
use crate::http_util::HttpBody;
|
|
use crate::metrics::metrics_op;
|
|
use deno_core::BufVec;
|
|
use deno_core::ErrBox;
|
|
use deno_core::JsRuntime;
|
|
use deno_core::OpState;
|
|
use futures::future::poll_fn;
|
|
use futures::future::FutureExt;
|
|
use futures::ready;
|
|
use std::cell::RefCell;
|
|
use std::collections::HashMap;
|
|
use std::pin::Pin;
|
|
use std::rc::Rc;
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
use std::task::Context;
|
|
use std::task::Poll;
|
|
use tokio::io::{AsyncRead, AsyncWrite};
|
|
use tokio::net::TcpStream;
|
|
use tokio_rustls::client::TlsStream as ClientTlsStream;
|
|
use tokio_rustls::server::TlsStream as ServerTlsStream;
|
|
|
|
#[cfg(not(windows))]
|
|
use std::os::unix::io::FromRawFd;
|
|
|
|
#[cfg(windows)]
|
|
use std::os::windows::io::FromRawHandle;
|
|
|
|
#[cfg(windows)]
|
|
extern crate winapi;
|
|
|
|
lazy_static! {
|
|
/// Due to portability issues on Windows handle to stdout is created from raw
|
|
/// file descriptor. The caveat of that approach is fact that when this
|
|
/// handle is dropped underlying file descriptor is closed - that is highly
|
|
/// not desirable in case of stdout. That's why we store this global handle
|
|
/// that is then cloned when obtaining stdio for process. In turn when
|
|
/// resource table is dropped storing reference to that handle, the handle
|
|
/// itself won't be closed (so Deno.core.print) will still work.
|
|
// TODO(ry) It should be possible to close stdout.
|
|
static ref STDIN_HANDLE: Option<std::fs::File> = {
|
|
#[cfg(not(windows))]
|
|
let stdin = unsafe { Some(std::fs::File::from_raw_fd(0)) };
|
|
#[cfg(windows)]
|
|
let stdin = unsafe {
|
|
let handle = winapi::um::processenv::GetStdHandle(
|
|
winapi::um::winbase::STD_INPUT_HANDLE,
|
|
);
|
|
if handle.is_null() {
|
|
return None;
|
|
}
|
|
Some(std::fs::File::from_raw_handle(handle))
|
|
};
|
|
stdin
|
|
};
|
|
static ref STDOUT_HANDLE: Option<std::fs::File> = {
|
|
#[cfg(not(windows))]
|
|
let stdout = unsafe { Some(std::fs::File::from_raw_fd(1)) };
|
|
#[cfg(windows)]
|
|
let stdout = unsafe {
|
|
let handle = winapi::um::processenv::GetStdHandle(
|
|
winapi::um::winbase::STD_OUTPUT_HANDLE,
|
|
);
|
|
if handle.is_null() {
|
|
return None;
|
|
}
|
|
Some(std::fs::File::from_raw_handle(handle))
|
|
};
|
|
stdout
|
|
};
|
|
static ref STDERR_HANDLE: Option<std::fs::File> = {
|
|
#[cfg(not(windows))]
|
|
let stderr = unsafe { Some(std::fs::File::from_raw_fd(2)) };
|
|
#[cfg(windows)]
|
|
let stderr = unsafe {
|
|
let handle = winapi::um::processenv::GetStdHandle(
|
|
winapi::um::winbase::STD_ERROR_HANDLE,
|
|
);
|
|
if handle.is_null() {
|
|
return None;
|
|
}
|
|
Some(std::fs::File::from_raw_handle(handle))
|
|
};
|
|
stderr
|
|
};
|
|
}
|
|
|
|
pub fn init(rt: &mut JsRuntime) {
|
|
rt.register_op("op_read", metrics_op(minimal_op(op_read)));
|
|
rt.register_op("op_write", metrics_op(minimal_op(op_write)));
|
|
}
|
|
|
|
pub fn get_stdio() -> (
|
|
Option<StreamResourceHolder>,
|
|
Option<StreamResourceHolder>,
|
|
Option<StreamResourceHolder>,
|
|
) {
|
|
let stdin = get_stdio_stream(&STDIN_HANDLE);
|
|
let stdout = get_stdio_stream(&STDOUT_HANDLE);
|
|
let stderr = get_stdio_stream(&STDERR_HANDLE);
|
|
|
|
(stdin, stdout, stderr)
|
|
}
|
|
|
|
fn get_stdio_stream(
|
|
handle: &Option<std::fs::File>,
|
|
) -> Option<StreamResourceHolder> {
|
|
match handle {
|
|
None => None,
|
|
Some(file_handle) => match file_handle.try_clone() {
|
|
Ok(clone) => Some(StreamResourceHolder::new(StreamResource::FsFile(
|
|
Some((tokio::fs::File::from_std(clone), FileMetadata::default())),
|
|
))),
|
|
Err(_e) => None,
|
|
},
|
|
}
|
|
}
|
|
|
|
fn no_buffer_specified() -> ErrBox {
|
|
ErrBox::type_error("no buffer specified")
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
use nix::sys::termios;
|
|
|
|
#[derive(Default)]
|
|
pub struct TTYMetadata {
|
|
#[cfg(unix)]
|
|
pub mode: Option<termios::Termios>,
|
|
}
|
|
|
|
#[derive(Default)]
|
|
pub struct FileMetadata {
|
|
pub tty: TTYMetadata,
|
|
}
|
|
|
|
pub struct StreamResourceHolder {
|
|
pub resource: StreamResource,
|
|
waker: HashMap<usize, futures::task::AtomicWaker>,
|
|
waker_counter: AtomicUsize,
|
|
}
|
|
|
|
impl StreamResourceHolder {
|
|
pub fn new(resource: StreamResource) -> StreamResourceHolder {
|
|
StreamResourceHolder {
|
|
resource,
|
|
// Atleast one task is expecter for the resource
|
|
waker: HashMap::with_capacity(1),
|
|
// Tracks wakers Ids
|
|
waker_counter: AtomicUsize::new(0),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for StreamResourceHolder {
|
|
fn drop(&mut self) {
|
|
self.wake_tasks();
|
|
}
|
|
}
|
|
|
|
impl StreamResourceHolder {
|
|
pub fn track_task(&mut self, cx: &Context) -> Result<usize, ErrBox> {
|
|
let waker = futures::task::AtomicWaker::new();
|
|
waker.register(cx.waker());
|
|
// Its OK if it overflows
|
|
let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed);
|
|
self.waker.insert(task_waker_id, waker);
|
|
Ok(task_waker_id)
|
|
}
|
|
|
|
pub fn wake_tasks(&mut self) {
|
|
for waker in self.waker.values() {
|
|
waker.wake();
|
|
}
|
|
}
|
|
|
|
pub fn untrack_task(&mut self, task_waker_id: usize) {
|
|
self.waker.remove(&task_waker_id);
|
|
}
|
|
}
|
|
|
|
pub enum StreamResource {
|
|
Stdin(tokio::io::Stdin, TTYMetadata),
|
|
FsFile(Option<(tokio::fs::File, FileMetadata)>),
|
|
TcpStream(Option<tokio::net::TcpStream>),
|
|
#[cfg(not(windows))]
|
|
UnixStream(tokio::net::UnixStream),
|
|
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
|
|
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
|
|
HttpBody(Box<HttpBody>),
|
|
ChildStdin(tokio::process::ChildStdin),
|
|
ChildStdout(tokio::process::ChildStdout),
|
|
ChildStderr(tokio::process::ChildStderr),
|
|
}
|
|
|
|
trait UnpinAsyncRead: AsyncRead + Unpin {}
|
|
trait UnpinAsyncWrite: AsyncWrite + Unpin {}
|
|
|
|
impl<T: AsyncRead + Unpin> UnpinAsyncRead for T {}
|
|
impl<T: AsyncWrite + Unpin> UnpinAsyncWrite for T {}
|
|
|
|
/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
|
|
/// but uses an `ErrBox` error instead of `std::io:Error`
|
|
pub trait DenoAsyncRead {
|
|
fn poll_read(
|
|
&mut self,
|
|
cx: &mut Context,
|
|
buf: &mut [u8],
|
|
) -> Poll<Result<usize, ErrBox>>;
|
|
}
|
|
|
|
impl DenoAsyncRead for StreamResource {
|
|
fn poll_read(
|
|
&mut self,
|
|
cx: &mut Context,
|
|
buf: &mut [u8],
|
|
) -> Poll<Result<usize, ErrBox>> {
|
|
use StreamResource::*;
|
|
let f: &mut dyn UnpinAsyncRead = match self {
|
|
FsFile(Some((f, _))) => f,
|
|
FsFile(None) => return Poll::Ready(Err(ErrBox::resource_unavailable())),
|
|
Stdin(f, _) => f,
|
|
TcpStream(Some(f)) => f,
|
|
#[cfg(not(windows))]
|
|
UnixStream(f) => f,
|
|
ClientTlsStream(f) => f,
|
|
ServerTlsStream(f) => f,
|
|
ChildStdout(f) => f,
|
|
ChildStderr(f) => f,
|
|
HttpBody(f) => f,
|
|
_ => return Err(ErrBox::bad_resource_id()).into(),
|
|
};
|
|
let v = ready!(Pin::new(f).poll_read(cx, buf))?;
|
|
Ok(v).into()
|
|
}
|
|
}
|
|
|
|
pub fn op_read(
|
|
state: Rc<RefCell<OpState>>,
|
|
is_sync: bool,
|
|
rid: i32,
|
|
mut zero_copy: BufVec,
|
|
) -> MinimalOp {
|
|
debug!("read rid={}", rid);
|
|
match zero_copy.len() {
|
|
0 => return MinimalOp::Sync(Err(no_buffer_specified())),
|
|
1 => {}
|
|
_ => panic!("Invalid number of arguments"),
|
|
}
|
|
|
|
if is_sync {
|
|
MinimalOp::Sync({
|
|
// First we look up the rid in the resource table.
|
|
std_file_resource(&mut state.borrow_mut(), rid as u32, move |r| match r {
|
|
Ok(std_file) => {
|
|
use std::io::Read;
|
|
std_file
|
|
.read(&mut zero_copy[0])
|
|
.map(|n: usize| n as i32)
|
|
.map_err(ErrBox::from)
|
|
}
|
|
Err(_) => {
|
|
Err(ErrBox::type_error("sync read not allowed on this resource"))
|
|
}
|
|
})
|
|
})
|
|
} else {
|
|
let mut zero_copy = zero_copy[0].clone();
|
|
MinimalOp::Async(
|
|
poll_fn(move |cx| {
|
|
let mut state = state.borrow_mut();
|
|
let resource_holder = state
|
|
.resource_table
|
|
.get_mut::<StreamResourceHolder>(rid as u32)
|
|
.ok_or_else(ErrBox::bad_resource_id)?;
|
|
|
|
let mut task_tracker_id: Option<usize> = None;
|
|
let nread = match resource_holder.resource.poll_read(cx, &mut zero_copy)
|
|
{
|
|
Poll::Ready(t) => {
|
|
if let Some(id) = task_tracker_id {
|
|
resource_holder.untrack_task(id);
|
|
}
|
|
t
|
|
}
|
|
Poll::Pending => {
|
|
task_tracker_id.replace(resource_holder.track_task(cx)?);
|
|
return Poll::Pending;
|
|
}
|
|
}?;
|
|
Poll::Ready(Ok(nread as i32))
|
|
})
|
|
.boxed_local(),
|
|
)
|
|
}
|
|
}
|
|
|
|
/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
|
|
/// but uses an `ErrBox` error instead of `std::io:Error`
|
|
pub trait DenoAsyncWrite {
|
|
fn poll_write(
|
|
&mut self,
|
|
cx: &mut Context,
|
|
buf: &[u8],
|
|
) -> Poll<Result<usize, ErrBox>>;
|
|
|
|
fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), ErrBox>>;
|
|
|
|
fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), ErrBox>>;
|
|
}
|
|
|
|
impl DenoAsyncWrite for StreamResource {
|
|
fn poll_write(
|
|
&mut self,
|
|
cx: &mut Context,
|
|
buf: &[u8],
|
|
) -> Poll<Result<usize, ErrBox>> {
|
|
use StreamResource::*;
|
|
let f: &mut dyn UnpinAsyncWrite = match self {
|
|
FsFile(Some((f, _))) => f,
|
|
FsFile(None) => return Poll::Pending,
|
|
TcpStream(Some(f)) => f,
|
|
#[cfg(not(windows))]
|
|
UnixStream(f) => f,
|
|
ClientTlsStream(f) => f,
|
|
ServerTlsStream(f) => f,
|
|
ChildStdin(f) => f,
|
|
_ => return Err(ErrBox::bad_resource_id()).into(),
|
|
};
|
|
|
|
let v = ready!(Pin::new(f).poll_write(cx, buf))?;
|
|
Ok(v).into()
|
|
}
|
|
|
|
fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), ErrBox>> {
|
|
use StreamResource::*;
|
|
let f: &mut dyn UnpinAsyncWrite = match self {
|
|
FsFile(Some((f, _))) => f,
|
|
FsFile(None) => return Poll::Pending,
|
|
TcpStream(Some(f)) => f,
|
|
#[cfg(not(windows))]
|
|
UnixStream(f) => f,
|
|
ClientTlsStream(f) => f,
|
|
ServerTlsStream(f) => f,
|
|
ChildStdin(f) => f,
|
|
_ => return Err(ErrBox::bad_resource_id()).into(),
|
|
};
|
|
|
|
ready!(Pin::new(f).poll_flush(cx))?;
|
|
Ok(()).into()
|
|
}
|
|
|
|
fn poll_close(&mut self, _cx: &mut Context) -> Poll<Result<(), ErrBox>> {
|
|
unimplemented!()
|
|
}
|
|
}
|
|
|
|
pub fn op_write(
|
|
state: Rc<RefCell<OpState>>,
|
|
is_sync: bool,
|
|
rid: i32,
|
|
zero_copy: BufVec,
|
|
) -> MinimalOp {
|
|
debug!("write rid={}", rid);
|
|
match zero_copy.len() {
|
|
0 => return MinimalOp::Sync(Err(no_buffer_specified())),
|
|
1 => {}
|
|
_ => panic!("Invalid number of arguments"),
|
|
}
|
|
|
|
if is_sync {
|
|
MinimalOp::Sync({
|
|
// First we look up the rid in the resource table.
|
|
std_file_resource(&mut state.borrow_mut(), rid as u32, move |r| match r {
|
|
Ok(std_file) => {
|
|
use std::io::Write;
|
|
std_file
|
|
.write(&zero_copy[0])
|
|
.map(|nwritten: usize| nwritten as i32)
|
|
.map_err(ErrBox::from)
|
|
}
|
|
Err(_) => {
|
|
Err(ErrBox::type_error("sync read not allowed on this resource"))
|
|
}
|
|
})
|
|
})
|
|
} else {
|
|
let zero_copy = zero_copy[0].clone();
|
|
MinimalOp::Async(
|
|
async move {
|
|
let nwritten = poll_fn(|cx| {
|
|
let mut state = state.borrow_mut();
|
|
let resource_holder = state
|
|
.resource_table
|
|
.get_mut::<StreamResourceHolder>(rid as u32)
|
|
.ok_or_else(ErrBox::bad_resource_id)?;
|
|
resource_holder.resource.poll_write(cx, &zero_copy)
|
|
})
|
|
.await?;
|
|
|
|
// TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
|
|
// and the reasons for the need to explicitly flush are not fully known.
|
|
// Figure out why it's needed and preferably remove it.
|
|
// https://github.com/denoland/deno/issues/3565
|
|
poll_fn(|cx| {
|
|
let mut state = state.borrow_mut();
|
|
let resource_holder = state
|
|
.resource_table
|
|
.get_mut::<StreamResourceHolder>(rid as u32)
|
|
.ok_or_else(ErrBox::bad_resource_id)?;
|
|
resource_holder.resource.poll_flush(cx)
|
|
})
|
|
.await?;
|
|
|
|
Ok(nwritten as i32)
|
|
}
|
|
.boxed_local(),
|
|
)
|
|
}
|
|
}
|
|
|
|
/// Helper function for operating on a std::fs::File stored in the resource table.
|
|
///
|
|
/// We store file system file resources as tokio::fs::File, so this is a little
|
|
/// utility function that gets a std::fs:File when you need to do blocking
|
|
/// operations.
|
|
///
|
|
/// Returns ErrorKind::Busy if the resource is being used by another op.
|
|
pub fn std_file_resource<F, T>(
|
|
state: &mut OpState,
|
|
rid: u32,
|
|
mut f: F,
|
|
) -> Result<T, ErrBox>
|
|
where
|
|
F:
|
|
FnMut(Result<&mut std::fs::File, &mut StreamResource>) -> Result<T, ErrBox>,
|
|
{
|
|
// First we look up the rid in the resource table.
|
|
let mut r = state.resource_table.get_mut::<StreamResourceHolder>(rid);
|
|
if let Some(ref mut resource_holder) = r {
|
|
// Sync write only works for FsFile. It doesn't make sense to do this
|
|
// for non-blocking sockets. So we error out if not FsFile.
|
|
match &mut resource_holder.resource {
|
|
StreamResource::FsFile(option_file_metadata) => {
|
|
// The object in the resource table is a tokio::fs::File - but in
|
|
// order to do a blocking write on it, we must turn it into a
|
|
// std::fs::File. Hopefully this code compiles down to nothing.
|
|
if let Some((tokio_file, metadata)) = option_file_metadata.take() {
|
|
match tokio_file.try_into_std() {
|
|
Ok(mut std_file) => {
|
|
let result = f(Ok(&mut std_file));
|
|
// Turn the std_file handle back into a tokio file, put it back
|
|
// in the resource table.
|
|
let tokio_file = tokio::fs::File::from_std(std_file);
|
|
resource_holder.resource =
|
|
StreamResource::FsFile(Some((tokio_file, metadata)));
|
|
// return the result.
|
|
result
|
|
}
|
|
Err(tokio_file) => {
|
|
// This function will return an error containing the file if
|
|
// some operation is in-flight.
|
|
resource_holder.resource =
|
|
StreamResource::FsFile(Some((tokio_file, metadata)));
|
|
Err(ErrBox::resource_unavailable())
|
|
}
|
|
}
|
|
} else {
|
|
Err(ErrBox::resource_unavailable())
|
|
}
|
|
}
|
|
_ => f(Err(&mut resource_holder.resource)),
|
|
}
|
|
} else {
|
|
Err(ErrBox::bad_resource_id())
|
|
}
|
|
}
|