1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-23 07:44:48 -05:00

refactor: use TaskQueue from deno_unsync (#20485)

This commit is contained in:
David Sherret 2023-09-13 23:36:24 -04:00 committed by GitHub
parent a4b7d563c4
commit e60cbfadc0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 35 deletions

17
Cargo.lock generated
View file

@ -1074,14 +1074,14 @@ dependencies = [
[[package]]
name = "deno_core"
version = "0.210.0"
version = "0.211.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1abaede1f57501d6c92075d678f522aea22794b82523e67db8130ca1a6a1cb5d"
checksum = "2e63f2803555dae13a88f66e0d6b97a5339753fc86651b118f56de3733ba2f57"
dependencies = [
"anyhow",
"bytes",
"deno_ops",
"deno_unsync",
"deno_unsync 0.2.1",
"futures",
"indexmap 2.0.0",
"libc",
@ -1299,7 +1299,7 @@ dependencies = [
"base64 0.13.1",
"chrono",
"deno_core",
"deno_unsync",
"deno_unsync 0.1.1",
"hex",
"log",
"num-bigint",
@ -1583,6 +1583,15 @@ dependencies = [
"tokio",
]
[[package]]
name = "deno_unsync"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0720e562455d6374a5292baec3fc895ed8bfed0937428e3245e50979924e5b15"
dependencies = [
"tokio",
]
[[package]]
name = "deno_url"
version = "0.118.0"

View file

@ -40,7 +40,7 @@ repository = "https://github.com/denoland/deno"
[workspace.dependencies]
deno_ast = { version = "0.29.1", features = ["transpiling"] }
deno_core = { version = "0.210.0" }
deno_core = { version = "0.211.0" }
deno_runtime = { version = "0.126.0", path = "./runtime" }
napi_sym = { version = "0.48.0", path = "./cli/napi/sym" }

View file

@ -3,6 +3,7 @@
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::unsync::spawn_blocking;
use deno_core::unsync::TaskQueue;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
@ -16,7 +17,6 @@ use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceHandle;
use deno_core::ResourceHandleFd;
use deno_core::TaskQueue;
use fs::FileResource;
use fs::FsError;
use fs::FsResult;
@ -26,6 +26,7 @@ use once_cell::sync::Lazy;
use std::borrow::Cow;
use std::cell::RefCell;
use std::fs::File as StdFile;
use std::future::Future;
use std::io;
use std::io::ErrorKind;
use std::io::Read;
@ -309,7 +310,7 @@ pub struct StdFileResourceInner {
cell: RefCell<Option<StdFile>>,
// Used to keep async actions in order and only allow one
// to occur at a time
cell_async_task_queue: TaskQueue,
cell_async_task_queue: Rc<TaskQueue>,
handle: ResourceHandleFd,
}
@ -339,48 +340,60 @@ impl StdFileResourceInner {
}
}
async fn with_inner_blocking_task<F, R: 'static + Send>(&self, action: F) -> R
fn with_inner_blocking_task<F, R: 'static + Send>(
&self,
action: F,
) -> impl Future<Output = R> + '_
where
F: FnOnce(&mut StdFile) -> R + Send + 'static,
{
// we want to restrict this to one async action at a time
let _permit = self.cell_async_task_queue.acquire().await;
// we take the value out of the cell, use it on a blocking task,
// then put it back into the cell when we're done
let mut did_take = false;
let mut cell_value = {
let mut cell = self.cell.borrow_mut();
match cell.as_mut().unwrap().try_clone().ok() {
Some(value) => value,
None => {
did_take = true;
cell.take().unwrap()
let acquire_fut = self.cell_async_task_queue.acquire();
async move {
let permit = acquire_fut.await;
// we take the value out of the cell, use it on a blocking task,
// then put it back into the cell when we're done
let mut did_take = false;
let mut cell_value = {
let mut cell = self.cell.borrow_mut();
match cell.as_mut().unwrap().try_clone().ok() {
Some(value) => value,
None => {
did_take = true;
cell.take().unwrap()
}
}
};
let (cell_value, result) = spawn_blocking(move || {
let result = action(&mut cell_value);
(cell_value, result)
})
.await
.unwrap();
if did_take {
// put it back
self.cell.borrow_mut().replace(cell_value);
}
};
let (cell_value, result) = spawn_blocking(move || {
let result = action(&mut cell_value);
(cell_value, result)
})
.await
.unwrap();
if did_take {
// put it back
self.cell.borrow_mut().replace(cell_value);
drop(permit); // explicit for clarity
result
}
result
}
async fn with_blocking_task<F, R: 'static + Send>(&self, action: F) -> R
fn with_blocking_task<F, R: 'static + Send>(
&self,
action: F,
) -> impl Future<Output = R>
where
F: FnOnce() -> R + Send + 'static,
{
// we want to restrict this to one async action at a time
let _permit = self.cell_async_task_queue.acquire().await;
spawn_blocking(action).await.unwrap()
let acquire_fut = self.cell_async_task_queue.acquire();
async move {
let _permit = acquire_fut.await;
spawn_blocking(action).await.unwrap()
}
}
}