1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-15 10:35:19 -05:00
denoland-deno/runtime/ops/io.rs
Luca Casonato 3b6b75bb46
feat(core): improve resource read & write traits (#16115)
This commit introduces two new buffer wrapper types to `deno_core`. The
main benefit of these new wrappers is that they can wrap a number of
different underlying buffer types. This allows for a more flexible read
and write API on resources that will require less copying of data
between different buffer representations.

- `BufView` is a read-only view onto a buffer. It can be backed by
`ZeroCopyBuf`, `Vec<u8>`, and `bytes::Bytes`.
- `BufViewMut` is a read-write view onto a buffer. It can be cheaply
converted into a `BufView`. It can be backed by `ZeroCopyBuf` or
`Vec<u8>`.

Both new buffer views have a cursor. This means that the start point of
the view can be constrained to write / read from just a slice of the
view. Only the start point of the slice can be adjusted. The end point
is fixed. To adjust the end point, the underlying buffer needs to be
truncated.

Readable resources have been changed to better cater to resources that
do not support BYOB reads. The basic `read` method now returns a
`BufView` instead of taking a `ZeroCopyBuf` to fill. This allows the
operation to return buffers that the resource has already allocated,
instead of forcing the caller to allocate the buffer. BYOB reads are
still very useful for resources that support them, so a new `read_byob`
method has been added that takes a `BufViewMut` to fill. `op_read`
attempts to use `read_byob` if the resource supports it, which falls
back to `read` and performs an additional copy if it does not. For
Rust->JS reads this change should have no impact, but for Rust->Rust
reads, this allows the caller to avoid an additional copy in many
scenarios. This combined with the support for `BufView` to be backed by
`bytes::Bytes` allows us to avoid one data copy when piping from a
`fetch` response into an `ext/http` response.

Writable resources have been changed to take a `BufView` instead of a
`ZeroCopyBuf` as an argument. This allows for less copying of data in
certain scenarios, as described above. Additionally a new
`Resource::write_all` method has been added that takes a `BufView` and
continually attempts to write the resource until the entire buffer has
been written. Certain resources like files can override this method to
provide a more efficient `write_all` implementation.
2022-10-09 14:49:25 +00:00

716 lines
18 KiB
Rust

// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use deno_core::error::resource_unavailable;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::parking_lot::Mutex;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufMutView;
use deno_core::BufView;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use once_cell::sync::Lazy;
use std::borrow::Cow;
use std::cell::RefCell;
use std::fs::File as StdFile;
use std::io::ErrorKind;
use std::io::Read;
use std::io::Write;
use std::rc::Rc;
use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::process;
#[cfg(unix)]
use std::os::unix::io::FromRawFd;
#[cfg(windows)]
use {
std::os::windows::io::FromRawHandle,
winapi::um::{processenv::GetStdHandle, winbase},
};
// Store the stdio fd/handles in global statics in order to keep them
// alive for the duration of the application since the last handle/fd
// being dropped will close the corresponding pipe.
#[cfg(unix)]
static STDIN_HANDLE: Lazy<StdFile> = Lazy::new(|| {
// SAFETY: corresponds to OS stdin
unsafe { StdFile::from_raw_fd(0) }
});
#[cfg(unix)]
static STDOUT_HANDLE: Lazy<StdFile> = Lazy::new(|| {
// SAFETY: corresponds to OS stdout
unsafe { StdFile::from_raw_fd(1) }
});
#[cfg(unix)]
static STDERR_HANDLE: Lazy<StdFile> = Lazy::new(|| {
// SAFETY: corresponds to OS stderr
unsafe { StdFile::from_raw_fd(2) }
});
#[cfg(windows)]
static STDIN_HANDLE: Lazy<StdFile> = Lazy::new(|| {
// SAFETY: corresponds to OS stdin
unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_INPUT_HANDLE)) }
});
#[cfg(windows)]
static STDOUT_HANDLE: Lazy<StdFile> = Lazy::new(|| {
// SAFETY: corresponds to OS stdout
unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_OUTPUT_HANDLE)) }
});
#[cfg(windows)]
static STDERR_HANDLE: Lazy<StdFile> = Lazy::new(|| {
// SAFETY: corresponds to OS stderr
unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_ERROR_HANDLE)) }
});
pub fn init() -> Extension {
Extension::builder()
.ops(vec![op_read_sync::decl(), op_write_sync::decl()])
.build()
}
pub enum StdioPipe {
Inherit,
File(StdFile),
}
impl Default for StdioPipe {
fn default() -> Self {
Self::Inherit
}
}
impl Clone for StdioPipe {
fn clone(&self) -> Self {
match self {
StdioPipe::Inherit => StdioPipe::Inherit,
StdioPipe::File(pipe) => StdioPipe::File(pipe.try_clone().unwrap()),
}
}
}
/// Specify how stdin, stdout, and stderr are piped.
/// By default, inherits from the process.
#[derive(Clone, Default)]
pub struct Stdio {
pub stdin: StdioPipe,
pub stdout: StdioPipe,
pub stderr: StdioPipe,
}
pub fn init_stdio(stdio: Stdio) -> Extension {
// todo(dsheret): don't do this? Taking out the writers was necessary to prevent invalid handle panics
let stdio = Rc::new(RefCell::new(Some(stdio)));
Extension::builder()
.middleware(|op| match op.name {
"op_print" => op_print::decl(),
_ => op,
})
.state(move |state| {
let stdio = stdio
.borrow_mut()
.take()
.expect("Extension only supports being used once.");
let t = &mut state.resource_table;
let rid = t.add(StdFileResource::stdio(
match stdio.stdin {
StdioPipe::Inherit => StdFileResourceInner {
kind: StdFileResourceKind::Stdin,
file: STDIN_HANDLE.try_clone().unwrap(),
},
StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
},
"stdin",
));
assert_eq!(rid, 0, "stdin must have ResourceId 0");
let rid = t.add(StdFileResource::stdio(
match stdio.stdout {
StdioPipe::Inherit => StdFileResourceInner {
kind: StdFileResourceKind::Stdout,
file: STDOUT_HANDLE.try_clone().unwrap(),
},
StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
},
"stdout",
));
assert_eq!(rid, 1, "stdout must have ResourceId 1");
let rid = t.add(StdFileResource::stdio(
match stdio.stderr {
StdioPipe::Inherit => StdFileResourceInner {
kind: StdFileResourceKind::Stderr,
file: STDERR_HANDLE.try_clone().unwrap(),
},
StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
},
"stderr",
));
assert_eq!(rid, 2, "stderr must have ResourceId 2");
Ok(())
})
.build()
}
#[cfg(unix)]
use nix::sys::termios;
use super::utils::TaskQueue;
#[derive(Default)]
pub struct TtyMetadata {
#[cfg(unix)]
pub mode: Option<termios::Termios>,
}
#[derive(Default)]
pub struct FileMetadata {
pub tty: TtyMetadata,
}
#[derive(Debug)]
pub struct WriteOnlyResource<S> {
stream: AsyncRefCell<S>,
}
impl<S: 'static> From<S> for WriteOnlyResource<S> {
fn from(stream: S) -> Self {
Self {
stream: stream.into(),
}
}
}
impl<S> WriteOnlyResource<S>
where
S: AsyncWrite + Unpin + 'static,
{
pub fn borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<S> {
RcRef::map(self, |r| &r.stream).borrow_mut()
}
async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
let mut stream = self.borrow_mut().await;
let nwritten = stream.write(data).await?;
Ok(nwritten)
}
async fn shutdown(self: Rc<Self>) -> Result<(), AnyError> {
let mut stream = self.borrow_mut().await;
stream.shutdown().await?;
Ok(())
}
pub fn into_inner(self) -> S {
self.stream.into_inner()
}
}
#[derive(Debug)]
pub struct ReadOnlyResource<S> {
stream: AsyncRefCell<S>,
cancel_handle: CancelHandle,
}
impl<S: 'static> From<S> for ReadOnlyResource<S> {
fn from(stream: S) -> Self {
Self {
stream: stream.into(),
cancel_handle: Default::default(),
}
}
}
impl<S> ReadOnlyResource<S>
where
S: AsyncRead + Unpin + 'static,
{
pub fn borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<S> {
RcRef::map(self, |r| &r.stream).borrow_mut()
}
pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
RcRef::map(self, |r| &r.cancel_handle)
}
pub fn cancel_read_ops(&self) {
self.cancel_handle.cancel()
}
async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, AnyError> {
let mut rd = self.borrow_mut().await;
let nread = rd.read(data).try_or_cancel(self.cancel_handle()).await?;
Ok(nread)
}
pub fn into_inner(self) -> S {
self.stream.into_inner()
}
}
pub type ChildStdinResource = WriteOnlyResource<process::ChildStdin>;
impl Resource for ChildStdinResource {
fn name(&self) -> Cow<str> {
"childStdin".into()
}
deno_core::impl_writable!();
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(self.shutdown())
}
}
pub type ChildStdoutResource = ReadOnlyResource<process::ChildStdout>;
impl Resource for ChildStdoutResource {
deno_core::impl_readable_byob!();
fn name(&self) -> Cow<str> {
"childStdout".into()
}
fn close(self: Rc<Self>) {
self.cancel_read_ops();
}
}
pub type ChildStderrResource = ReadOnlyResource<process::ChildStderr>;
impl Resource for ChildStderrResource {
deno_core::impl_readable_byob!();
fn name(&self) -> Cow<str> {
"childStderr".into()
}
fn close(self: Rc<Self>) {
self.cancel_read_ops();
}
}
#[derive(Clone, Copy)]
enum StdFileResourceKind {
File,
// For stdout and stderr, we sometimes instead use std::io::stdout() directly,
// because we get some Windows specific functionality for free by using Rust
// std's wrappers. So we take a bit of a complexity hit in order to not
// have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs
Stdin,
Stdout,
Stderr,
}
struct StdFileResourceInner {
kind: StdFileResourceKind,
file: StdFile,
}
impl StdFileResourceInner {
pub fn file(fs_file: StdFile) -> Self {
StdFileResourceInner {
kind: StdFileResourceKind::File,
file: fs_file,
}
}
pub fn with_file<R>(&mut self, f: impl FnOnce(&mut StdFile) -> R) -> R {
f(&mut self.file)
}
pub fn try_clone(&self) -> Result<Self, std::io::Error> {
Ok(Self {
kind: self.kind,
file: self.file.try_clone()?,
})
}
pub fn write_and_maybe_flush(
&mut self,
buf: &[u8],
) -> Result<usize, AnyError> {
// Rust will line buffer and we don't want that behavior
// (see https://github.com/denoland/deno/issues/948), so flush stdout and stderr.
// Although an alternative solution could be to bypass Rust's std by
// using the raw fds/handles, it will cause encoding issues on Windows
// that we get solved for free by using Rust's stdio wrappers (see
// std/src/sys/windows/stdio.rs in Rust's source code).
match self.kind {
StdFileResourceKind::File => Ok(self.file.write(buf)?),
StdFileResourceKind::Stdin => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
// bypass the file and use std::io::stdout()
let mut stdout = std::io::stdout().lock();
let nwritten = stdout.write(buf)?;
stdout.flush()?;
Ok(nwritten)
}
StdFileResourceKind::Stderr => {
// bypass the file and use std::io::stderr()
let mut stderr = std::io::stderr().lock();
let nwritten = stderr.write(buf)?;
stderr.flush()?;
Ok(nwritten)
}
}
}
pub fn write_all_and_maybe_flush(
&mut self,
buf: &[u8],
) -> Result<(), AnyError> {
// this method exists instead of using a `Write` implementation
// so that we can acquire the locks once and do both actions
match self.kind {
StdFileResourceKind::File => Ok(self.file.write_all(buf)?),
StdFileResourceKind::Stdin => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
// bypass the file and use std::io::stdout()
let mut stdout = std::io::stdout().lock();
stdout.write_all(buf)?;
stdout.flush()?;
Ok(())
}
StdFileResourceKind::Stderr => {
// bypass the file and use std::io::stderr()
let mut stderr = std::io::stderr().lock();
stderr.write_all(buf)?;
stderr.flush()?;
Ok(())
}
}
}
}
impl Read for StdFileResourceInner {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self.kind {
StdFileResourceKind::File | StdFileResourceKind::Stdin => {
self.file.read(buf)
}
StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => {
Err(ErrorKind::Unsupported.into())
}
}
}
}
struct StdFileResourceCellValue {
inner: StdFileResourceInner,
meta_data: Arc<Mutex<FileMetadata>>,
}
impl StdFileResourceCellValue {
pub fn try_clone(&self) -> Result<Self, std::io::Error> {
Ok(Self {
inner: self.inner.try_clone()?,
meta_data: self.meta_data.clone(),
})
}
}
pub struct StdFileResource {
name: String,
// We can't use an AsyncRefCell here because we need to allow
// access to the resource synchronously at any time and
// asynchronously one at a time in order
cell: RefCell<Option<StdFileResourceCellValue>>,
// Used to keep async actions in order and only allow one
// to occur at a time
cell_async_task_queue: TaskQueue,
}
impl StdFileResource {
fn stdio(inner: StdFileResourceInner, name: &str) -> Self {
Self {
cell: RefCell::new(Some(StdFileResourceCellValue {
inner,
meta_data: Default::default(),
})),
cell_async_task_queue: Default::default(),
name: name.to_string(),
}
}
pub fn fs_file(fs_file: StdFile) -> Self {
Self {
cell: RefCell::new(Some(StdFileResourceCellValue {
inner: StdFileResourceInner::file(fs_file),
meta_data: Default::default(),
})),
cell_async_task_queue: Default::default(),
name: "fsFile".to_string(),
}
}
fn with_inner_and_metadata<TResult>(
self: Rc<Self>,
action: impl FnOnce(
&mut StdFileResourceInner,
&Arc<Mutex<FileMetadata>>,
) -> Result<TResult, AnyError>,
) -> Result<TResult, AnyError> {
match self.cell.try_borrow_mut() {
Ok(mut cell) => {
let mut file = cell.take().unwrap();
let result = action(&mut file.inner, &file.meta_data);
cell.replace(file);
result
}
Err(_) => Err(resource_unavailable()),
}
}
async fn with_inner_blocking_task<F, R: Send + 'static>(
self: Rc<Self>,
action: F,
) -> R
where
F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static,
{
// we want to restrict this to one async action at a time
let _permit = self.cell_async_task_queue.acquire().await;
// we take the value out of the cell, use it on a blocking task,
// then put it back into the cell when we're done
let mut did_take = false;
let mut cell_value = {
let mut cell = self.cell.borrow_mut();
match cell.as_mut().unwrap().try_clone() {
Ok(value) => value,
Err(_) => {
did_take = true;
cell.take().unwrap()
}
}
};
let (cell_value, result) = tokio::task::spawn_blocking(move || {
let result = action(&mut cell_value.inner);
(cell_value, result)
})
.await
.unwrap();
if did_take {
// put it back
self.cell.borrow_mut().replace(cell_value);
}
result
}
async fn read_byob(
self: Rc<Self>,
mut buf: BufMutView,
) -> Result<(usize, BufMutView), AnyError> {
self
.with_inner_blocking_task(move |inner| {
let nread = inner.read(&mut buf)?;
Ok((nread, buf))
})
.await
}
async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
let buf = data.to_owned();
self
.with_inner_blocking_task(move |inner| inner.write_and_maybe_flush(&buf))
.await
}
async fn write_all(self: Rc<Self>, data: &[u8]) -> Result<(), AnyError> {
let buf = data.to_owned();
self
.with_inner_blocking_task(move |inner| {
inner.write_all_and_maybe_flush(&buf)
})
.await
}
fn with_resource<F, R>(
state: &mut OpState,
rid: ResourceId,
f: F,
) -> Result<R, AnyError>
where
F: FnOnce(Rc<StdFileResource>) -> Result<R, AnyError>,
{
let resource = state.resource_table.get::<StdFileResource>(rid)?;
f(resource)
}
pub fn with_file<F, R>(
state: &mut OpState,
rid: ResourceId,
f: F,
) -> Result<R, AnyError>
where
F: FnOnce(&mut StdFile) -> Result<R, AnyError>,
{
Self::with_resource(state, rid, move |resource| {
resource.with_inner_and_metadata(move |inner, _| inner.with_file(f))
})
}
pub fn with_file_and_metadata<F, R>(
state: &mut OpState,
rid: ResourceId,
f: F,
) -> Result<R, AnyError>
where
F: FnOnce(&mut StdFile, &Arc<Mutex<FileMetadata>>) -> Result<R, AnyError>,
{
Self::with_resource(state, rid, move |resource| {
resource.with_inner_and_metadata(move |inner, metadata| {
inner.with_file(move |file| f(file, metadata))
})
})
}
pub async fn with_file_blocking_task<F, R: Send + 'static>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
f: F,
) -> Result<R, AnyError>
where
F: (FnOnce(&mut StdFile) -> Result<R, AnyError>) + Send + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<StdFileResource>(rid)?;
resource
.with_inner_blocking_task(move |inner| inner.with_file(f))
.await
}
pub fn clone_file(
state: &mut OpState,
rid: ResourceId,
) -> Result<StdFile, AnyError> {
Self::with_file(state, rid, move |std_file| {
std_file.try_clone().map_err(AnyError::from)
})
}
pub fn as_stdio(
state: &mut OpState,
rid: u32,
) -> Result<std::process::Stdio, AnyError> {
Self::with_resource(state, rid, |resource| {
resource.with_inner_and_metadata(|inner, _| match inner.kind {
StdFileResourceKind::File => {
let file = inner.file.try_clone()?;
Ok(file.into())
}
_ => Ok(std::process::Stdio::inherit()),
})
})
}
}
impl Resource for StdFileResource {
fn name(&self) -> Cow<str> {
self.name.as_str().into()
}
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<deno_core::BufView> {
Box::pin(async move {
let vec = vec![0; limit];
let buf = BufMutView::from(vec);
let (nread, buf) = self.read_byob(buf).await?;
let mut vec = buf.unwrap_vec();
if vec.len() != nread {
vec.truncate(nread);
}
Ok(BufView::from(vec))
})
}
fn read_byob(
self: Rc<Self>,
buf: deno_core::BufMutView,
) -> AsyncResult<(usize, deno_core::BufMutView)> {
Box::pin(self.read_byob(buf))
}
deno_core::impl_writable!(with_all);
#[cfg(unix)]
fn backing_fd(self: Rc<Self>) -> Option<std::os::unix::prelude::RawFd> {
use std::os::unix::io::AsRawFd;
self
.with_inner_and_metadata(move |std_file, _| {
Ok(std_file.with_file(|f| f.as_raw_fd()))
})
.ok()
}
}
// override op_print to use the stdout and stderr in the resource table
#[op]
pub fn op_print(
state: &mut OpState,
msg: String,
is_err: bool,
) -> Result<(), AnyError> {
let rid = if is_err { 2 } else { 1 };
StdFileResource::with_resource(state, rid, move |resource| {
resource.with_inner_and_metadata(|inner, _| {
inner.write_all_and_maybe_flush(msg.as_bytes())?;
Ok(())
})
})
}
#[op]
fn op_read_sync(
state: &mut OpState,
rid: ResourceId,
mut buf: ZeroCopyBuf,
) -> Result<u32, AnyError> {
StdFileResource::with_resource(state, rid, move |resource| {
resource.with_inner_and_metadata(|inner, _| {
inner
.read(&mut buf)
.map(|n: usize| n as u32)
.map_err(AnyError::from)
})
})
}
#[op]
fn op_write_sync(
state: &mut OpState,
rid: ResourceId,
buf: ZeroCopyBuf,
) -> Result<u32, AnyError> {
StdFileResource::with_resource(state, rid, move |resource| {
resource.with_inner_and_metadata(|inner, _| {
inner
.write_and_maybe_flush(&buf)
.map(|nwritten: usize| nwritten as u32)
.map_err(AnyError::from)
})
})
}