1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

perf(runtime): bypass tokio file and bump op buffer size to 64K (#14319)

This commit is contained in:
Divy Srivastava 2022-04-19 20:11:31 +05:30 committed by GitHub
parent 3d1123f8b0
commit 1c05e41f37
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 113 additions and 162 deletions

View file

@ -180,11 +180,14 @@ itest!(ops_sanitizer_multiple_timeout_tests_no_trace {
output: "test/ops_sanitizer_multiple_timeout_tests_no_trace.out",
});
itest!(ops_sanitizer_missing_details {
args: "test --allow-write --allow-read test/ops_sanitizer_missing_details.ts",
exit_code: 1,
output: "test/ops_sanitizer_missing_details.out",
});
// TODO(@littledivy): re-enable this test, recent optimizations made output non deterministic.
// https://github.com/denoland/deno/issues/14268
//
// itest!(ops_sanitizer_missing_details {
// args: "test --allow-write --allow-read test/ops_sanitizer_missing_details.ts",
// exit_code: 1,
// output: "test/ops_sanitizer_missing_details.out",
// });
itest!(ops_sanitizer_nexttick {
args: "test test/ops_sanitizer_nexttick.ts",

View file

@ -65,7 +65,7 @@
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
const DEFAULT_CHUNK_SIZE = 16_640;
const DEFAULT_CHUNK_SIZE = 64 * 1024;
function tryClose(rid) {
try {

View file

@ -114,7 +114,7 @@
return core.write(rid, data);
}
const READ_PER_ITER = 16 * 1024; // 16kb, see https://github.com/denoland/deno/issues/10157
const READ_PER_ITER = 64 * 1024; // 64kb
function readAll(r) {
return readAllInner(r);

View file

@ -15,7 +15,6 @@ use deno_core::ZeroCopyBuf;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::ResourceId;
use deno_crypto::rand::thread_rng;
use deno_crypto::rand::Rng;
@ -32,7 +31,6 @@ use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tokio::io::AsyncSeekExt;
#[cfg(not(unix))]
use deno_core::error::generic_error;
@ -172,8 +170,7 @@ fn op_open_sync(
let std_file = open_options.open(&path).map_err(|err| {
Error::new(err.kind(), format!("{}, open '{}'", err, path.display()))
})?;
let tokio_file = tokio::fs::File::from_std(std_file);
let resource = StdFileResource::fs_file(tokio_file);
let resource = StdFileResource::fs_file(std_file);
let rid = state.resource_table.add(resource);
Ok(rid)
}
@ -184,14 +181,13 @@ async fn op_open_async(
args: OpenArgs,
) -> Result<ResourceId, AnyError> {
let (path, open_options) = open_helper(&mut state.borrow_mut(), &args)?;
let tokio_file = tokio::fs::OpenOptions::from(open_options)
.open(&path)
.await
.map_err(|err| {
let std_file = tokio::task::spawn_blocking(move || {
open_options.open(path.clone()).map_err(|err| {
Error::new(err.kind(), format!("{}, open '{}'", err, path.display()))
})?;
let resource = StdFileResource::fs_file(tokio_file);
})
})
.await?;
let resource = StdFileResource::fs_file(std_file?);
let rid = state.borrow_mut().resource_table.add(resource);
Ok(rid)
}
@ -342,12 +338,15 @@ async fn op_seek_async(
return Err(bad_resource_id());
}
let mut fs_file = RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap())
.borrow_mut()
.await;
let fs_file = resource.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
let pos = (*fs_file).0.as_mut().unwrap().seek(seek_from).await?;
Ok(pos)
tokio::task::spawn_blocking(move || {
let mut std_file = std_file.lock().unwrap();
std_file.seek(seek_from)
})
.await?
.map_err(AnyError::from)
}
#[op]
@ -376,12 +375,15 @@ async fn op_fdatasync_async(
return Err(bad_resource_id());
}
let mut fs_file = RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap())
.borrow_mut()
.await;
let fs_file = resource.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
(*fs_file).0.as_mut().unwrap().sync_data().await?;
Ok(())
tokio::task::spawn_blocking(move || {
let std_file = std_file.lock().unwrap();
std_file.sync_data()
})
.await?
.map_err(AnyError::from)
}
#[op]
@ -407,12 +409,15 @@ async fn op_fsync_async(
return Err(bad_resource_id());
}
let mut fs_file = RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap())
.borrow_mut()
.await;
let fs_file = resource.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
(*fs_file).0.as_mut().unwrap().sync_all().await?;
Ok(())
tokio::task::spawn_blocking(move || {
let std_file = std_file.lock().unwrap();
std_file.sync_all()
})
.await?
.map_err(AnyError::from)
}
#[op]
@ -441,11 +446,15 @@ async fn op_fstat_async(
return Err(bad_resource_id());
}
let mut fs_file = RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap())
.borrow_mut()
.await;
let fs_file = resource.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
let metadata = (*fs_file).0.as_mut().unwrap().metadata().await?;
let metadata = tokio::task::spawn_blocking(move || {
let std_file = std_file.lock().unwrap();
std_file.metadata()
})
.await?
.map_err(AnyError::from)?;
Ok(get_stat(metadata))
}
@ -489,19 +498,11 @@ async fn op_flock_async(
return Err(bad_resource_id());
}
let mut fs_file = RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap())
.borrow_mut()
.await;
let fs_file = resource.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
let std_file = (*fs_file)
.0
.as_mut()
.unwrap()
.try_clone()
.await?
.into_std()
.await;
tokio::task::spawn_blocking(move || -> Result<(), AnyError> {
let std_file = std_file.lock().unwrap();
if exclusive {
std_file.lock_exclusive()?;
} else {
@ -546,19 +547,11 @@ async fn op_funlock_async(
return Err(bad_resource_id());
}
let mut fs_file = RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap())
.borrow_mut()
.await;
let fs_file = resource.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
let std_file = (*fs_file)
.0
.as_mut()
.unwrap()
.try_clone()
.await?
.into_std()
.await;
tokio::task::spawn_blocking(move || -> Result<(), AnyError> {
let std_file = std_file.lock().unwrap();
std_file.unlock()?;
Ok(())
})
@ -1642,12 +1635,15 @@ async fn op_ftruncate_async(
return Err(bad_resource_id());
}
let mut fs_file = RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap())
.borrow_mut()
.await;
let fs_file = resource.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
(*fs_file).0.as_mut().unwrap().set_len(len).await?;
Ok(())
tokio::task::spawn_blocking(move || {
let std_file = std_file.lock().unwrap();
std_file.set_len(len)
})
.await?
.map_err(AnyError::from)
}
#[derive(Deserialize)]
@ -1941,20 +1937,10 @@ async fn op_futime_async(
return Err(bad_resource_id());
}
let mut fs_file = RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap())
.borrow_mut()
.await;
let std_file = (*fs_file)
.0
.as_mut()
.unwrap()
.try_clone()
.await?
.into_std()
.await;
let fs_file = resource.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
tokio::task::spawn_blocking(move || {
let std_file = std_file.lock().unwrap();
filetime::set_file_handle_times(&std_file, Some(atime), Some(mtime))?;
Ok(())
})

View file

@ -17,10 +17,13 @@ use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use once_cell::sync::Lazy;
use std::borrow::Cow;
use std::cell::RefCell;
use std::fs::File as StdFile;
use std::io::Read;
use std::io::Write;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
@ -225,10 +228,11 @@ impl Resource for ChildStderrResource {
}
}
#[derive(Debug, Default)]
type MaybeSharedStdFile = Option<Arc<Mutex<StdFile>>>;
#[derive(Default)]
pub struct StdFileResource {
pub fs_file:
Option<AsyncRefCell<(Option<tokio::fs::File>, Option<FileMetadata>)>>,
pub fs_file: Option<(MaybeSharedStdFile, Option<RefCell<FileMetadata>>)>,
cancel: CancelHandle,
name: String,
}
@ -236,21 +240,21 @@ pub struct StdFileResource {
impl StdFileResource {
pub fn stdio(std_file: &StdFile, name: &str) -> Self {
Self {
fs_file: Some(AsyncRefCell::new((
std_file.try_clone().map(tokio::fs::File::from_std).ok(),
Some(FileMetadata::default()),
))),
fs_file: Some((
std_file.try_clone().map(|s| Arc::new(Mutex::new(s))).ok(),
Some(RefCell::new(FileMetadata::default())),
)),
name: name.to_string(),
..Default::default()
}
}
pub fn fs_file(fs_file: tokio::fs::File) -> Self {
pub fn fs_file(fs_file: StdFile) -> Self {
Self {
fs_file: Some(AsyncRefCell::new((
Some(fs_file),
Some(FileMetadata::default()),
))),
fs_file: Some((
Some(Arc::new(Mutex::new(fs_file))),
Some(RefCell::new(FileMetadata::default())),
)),
name: "fsFile".to_string(),
..Default::default()
}
@ -261,11 +265,14 @@ impl StdFileResource {
mut buf: ZeroCopyBuf,
) -> Result<usize, AnyError> {
if self.fs_file.is_some() {
let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap())
.borrow_mut()
.await;
let nwritten = fs_file.0.as_mut().unwrap().read(&mut buf).await?;
Ok(nwritten)
let fs_file = self.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
tokio::task::spawn_blocking(move || {
let mut std_file = std_file.lock().unwrap();
std_file.read(&mut buf)
})
.await?
.map_err(AnyError::from)
} else {
Err(resource_unavailable())
}
@ -273,12 +280,14 @@ impl StdFileResource {
async fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> Result<usize, AnyError> {
if self.fs_file.is_some() {
let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap())
.borrow_mut()
.await;
let nwritten = fs_file.0.as_mut().unwrap().write(&buf).await?;
fs_file.0.as_mut().unwrap().flush().await?;
Ok(nwritten)
let fs_file = self.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
tokio::task::spawn_blocking(move || {
let mut std_file = std_file.lock().unwrap();
std_file.write(&buf)
})
.await?
.map_err(AnyError::from)
} else {
Err(resource_unavailable())
}
@ -292,42 +301,18 @@ impl StdFileResource {
where
F: FnMut(Result<&mut std::fs::File, ()>) -> Result<R, AnyError>,
{
// First we look up the rid in the resource table.
let resource = state.resource_table.get::<StdFileResource>(rid)?;
// TODO(@AaronO): does this make sense ?
// Sync write only works for FsFile. It doesn't make sense to do this
// for non-blocking sockets. So we error out if not FsFile.
if resource.fs_file.is_none() {
return f(Err(()));
}
// The object in the resource table is a tokio::fs::File - but in
// order to do a blocking write on it, we must turn it into a
// std::fs::File. Hopefully this code compiles down to nothing.
let fs_file_resource =
RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
if let Some(mut fs_file) = fs_file_resource {
let tokio_file = fs_file.0.take().unwrap();
match tokio_file.try_into_std() {
Ok(mut std_file) => {
let result = f(Ok(&mut std_file));
// Turn the std_file handle back into a tokio file, put it back
// in the resource table.
let tokio_file = tokio::fs::File::from_std(std_file);
fs_file.0 = Some(tokio_file);
// return the result.
result
}
Err(tokio_file) => {
// This function will return an error containing the file if
// some operation is in-flight.
fs_file.0 = Some(tokio_file);
Err(resource_unavailable())
}
}
} else {
Err(resource_unavailable())
let (r, _) = resource.fs_file.as_ref().unwrap();
match r {
Some(r) => f(Ok(&mut r.as_ref().lock().unwrap())),
None => Err(resource_unavailable()),
}
}
}

View file

@ -8,7 +8,6 @@ use deno_core::error::AnyError;
use deno_core::op;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::ResourceId;
use serde::Deserialize;
use serde::Serialize;
@ -96,28 +95,11 @@ fn op_set_raw(state: &mut OpState, args: SetRawArgs) -> Result<(), AnyError> {
return Err(bad_resource_id());
}
let fs_file_resource =
RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
let handle_result = if let Some(mut fs_file) = fs_file_resource {
let tokio_file = fs_file.0.take().unwrap();
match tokio_file.try_into_std() {
Ok(std_file) => {
let handle_result = if let Some((fs_file, _)) = &resource.fs_file {
let file = fs_file.as_ref().unwrap().clone();
let std_file = file.lock().unwrap();
let raw_handle = std_file.as_raw_handle();
// Turn the std_file handle back into a tokio file, put it back
// in the resource table.
let tokio_file = tokio::fs::File::from_std(std_file);
fs_file.0 = Some(tokio_file);
// return the result.
Ok(raw_handle)
}
Err(tokio_file) => {
// This function will return an error containing the file if
// some operation is in-flight.
fs_file.0 = Some(tokio_file);
Err(resource_unavailable())
}
}
} else {
Err(resource_unavailable())
};
@ -156,20 +138,15 @@ fn op_set_raw(state: &mut OpState, args: SetRawArgs) -> Result<(), AnyError> {
return Err(not_supported());
}
let maybe_fs_file_resource =
RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
let (fs_file, meta) =
resource.fs_file.as_ref().ok_or_else(resource_unavailable)?;
if maybe_fs_file_resource.is_none() {
if fs_file.is_none() {
return Err(resource_unavailable());
}
let mut fs_file_resource = maybe_fs_file_resource.unwrap();
if fs_file_resource.0.is_none() {
return Err(resource_unavailable());
}
let raw_fd = fs_file_resource.0.as_ref().unwrap().as_raw_fd();
let maybe_tty_mode = &mut fs_file_resource.1.as_mut().unwrap().tty.mode;
let raw_fd = fs_file.as_ref().unwrap().lock().unwrap().as_raw_fd();
let maybe_tty_mode = &mut meta.as_ref().unwrap().borrow_mut().tty.mode;
if is_raw {
if maybe_tty_mode.is_none() {