mirror of
https://github.com/denoland/deno.git
synced 2024-12-24 08:09:08 -05:00
refactor(core): bake single-thread assumptions into spawn/spawn_blocking (#19056)
Partially supersedes #19016. This migrates `spawn` and `spawn_blocking` to `deno_core`, and removes the requirement for `spawn` tasks to be `Send` given our single-threaded executor. While we don't need to technically do anything w/`spawn_blocking`, this allows us to have a single `JoinHandle` type that works for both cases, and allows us to more easily experiment with alternative `spawn_blocking` implementations that do not require tokio (ie: rayon). Async ops (+~35%): Before: ``` time 1310 ms rate 763358 time 1267 ms rate 789265 time 1259 ms rate 794281 time 1266 ms rate 789889 ``` After: ``` time 956 ms rate 1046025 time 954 ms rate 1048218 time 924 ms rate 1082251 time 920 ms rate 1086956 ``` HTTP serve (+~4.4%): Before: ``` Running 10s test @ http://localhost:4500 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 68.78us 19.77us 1.43ms 86.84% Req/Sec 68.78k 5.00k 73.84k 91.58% 1381833 requests in 10.10s, 167.36MB read Requests/sec: 136823.29 Transfer/sec: 16.57MB ``` After: ``` Running 10s test @ http://localhost:4500 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 63.12us 17.43us 1.11ms 85.13% Req/Sec 71.82k 3.71k 77.02k 79.21% 1443195 requests in 10.10s, 174.79MB read Requests/sec: 142921.99 Transfer/sec: 17.31MB ``` Suggested-By: alice@ryhl.io Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
parent
e61ee44d69
commit
3392a8e530
42 changed files with 415 additions and 288 deletions
5
cli/cache/cache_db.rs
vendored
5
cli/cache/cache_db.rs
vendored
|
@ -3,6 +3,7 @@
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::parking_lot::Mutex;
|
use deno_core::parking_lot::Mutex;
|
||||||
use deno_core::parking_lot::MutexGuard;
|
use deno_core::parking_lot::MutexGuard;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_runtime::deno_webstorage::rusqlite;
|
use deno_runtime::deno_webstorage::rusqlite;
|
||||||
use deno_runtime::deno_webstorage::rusqlite::Connection;
|
use deno_runtime::deno_webstorage::rusqlite::Connection;
|
||||||
use deno_runtime::deno_webstorage::rusqlite::OptionalExtension;
|
use deno_runtime::deno_webstorage::rusqlite::OptionalExtension;
|
||||||
|
@ -95,7 +96,7 @@ impl Drop for CacheDB {
|
||||||
// Hand off SQLite connection to another thread to do the surprisingly expensive cleanup
|
// Hand off SQLite connection to another thread to do the surprisingly expensive cleanup
|
||||||
let inner = inner.into_inner().into_inner();
|
let inner = inner.into_inner().into_inner();
|
||||||
if let Some(conn) = inner {
|
if let Some(conn) = inner {
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
drop(conn);
|
drop(conn);
|
||||||
log::trace!(
|
log::trace!(
|
||||||
"Cleaned up SQLite connection at {}",
|
"Cleaned up SQLite connection at {}",
|
||||||
|
@ -168,7 +169,7 @@ impl CacheDB {
|
||||||
fn spawn_eager_init_thread(&self) {
|
fn spawn_eager_init_thread(&self) {
|
||||||
let clone = self.clone();
|
let clone = self.clone();
|
||||||
debug_assert!(tokio::runtime::Handle::try_current().is_ok());
|
debug_assert!(tokio::runtime::Handle::try_current().is_ok());
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
let lock = clone.conn.lock();
|
let lock = clone.conn.lock();
|
||||||
clone.initialize(&lock);
|
clone.initialize(&lock);
|
||||||
});
|
});
|
||||||
|
|
5
cli/cache/incremental.rs
vendored
5
cli/cache/incremental.rs
vendored
|
@ -7,9 +7,10 @@ use std::path::PathBuf;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::parking_lot::Mutex;
|
use deno_core::parking_lot::Mutex;
|
||||||
use deno_core::serde_json;
|
use deno_core::serde_json;
|
||||||
|
use deno_core::task::spawn;
|
||||||
|
use deno_core::task::JoinHandle;
|
||||||
use deno_runtime::deno_webstorage::rusqlite::params;
|
use deno_runtime::deno_webstorage::rusqlite::params;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
|
|
||||||
use super::cache_db::CacheDB;
|
use super::cache_db::CacheDB;
|
||||||
use super::cache_db::CacheDBConfiguration;
|
use super::cache_db::CacheDBConfiguration;
|
||||||
|
@ -93,7 +94,7 @@ impl IncrementalCacheInner {
|
||||||
tokio::sync::mpsc::unbounded_channel::<ReceiverMessage>();
|
tokio::sync::mpsc::unbounded_channel::<ReceiverMessage>();
|
||||||
|
|
||||||
// sqlite isn't `Sync`, so we do all the updating on a dedicated task
|
// sqlite isn't `Sync`, so we do all the updating on a dedicated task
|
||||||
let handle = tokio::task::spawn(async move {
|
let handle = spawn(async move {
|
||||||
while let Some(message) = receiver.recv().await {
|
while let Some(message) = receiver.recv().await {
|
||||||
match message {
|
match message {
|
||||||
ReceiverMessage::Update(path, hash) => {
|
ReceiverMessage::Update(path, hash) => {
|
||||||
|
|
|
@ -8,6 +8,7 @@ use deno_core::anyhow::bail;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::serde_json;
|
use deno_core::serde_json;
|
||||||
use deno_core::serde_json::Value;
|
use deno_core::serde_json::Value;
|
||||||
|
use deno_core::task::spawn;
|
||||||
use tower_lsp::lsp_types as lsp;
|
use tower_lsp::lsp_types as lsp;
|
||||||
use tower_lsp::lsp_types::ConfigurationItem;
|
use tower_lsp::lsp_types::ConfigurationItem;
|
||||||
|
|
||||||
|
@ -56,7 +57,7 @@ impl Client {
|
||||||
) {
|
) {
|
||||||
// do on a task in case the caller currently is in the lsp lock
|
// do on a task in case the caller currently is in the lsp lock
|
||||||
let client = self.0.clone();
|
let client = self.0.clone();
|
||||||
tokio::task::spawn(async move {
|
spawn(async move {
|
||||||
client.send_registry_state_notification(params).await;
|
client.send_registry_state_notification(params).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -64,7 +65,7 @@ impl Client {
|
||||||
pub fn send_test_notification(&self, params: TestingNotification) {
|
pub fn send_test_notification(&self, params: TestingNotification) {
|
||||||
// do on a task in case the caller currently is in the lsp lock
|
// do on a task in case the caller currently is in the lsp lock
|
||||||
let client = self.0.clone();
|
let client = self.0.clone();
|
||||||
tokio::task::spawn(async move {
|
spawn(async move {
|
||||||
client.send_test_notification(params).await;
|
client.send_test_notification(params).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -77,7 +78,7 @@ impl Client {
|
||||||
// do on a task in case the caller currently is in the lsp lock
|
// do on a task in case the caller currently is in the lsp lock
|
||||||
let client = self.0.clone();
|
let client = self.0.clone();
|
||||||
let message = message.to_string();
|
let message = message.to_string();
|
||||||
tokio::task::spawn(async move {
|
spawn(async move {
|
||||||
client.show_message(message_type, message).await;
|
client.show_message(message_type, message).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,8 @@ use deno_core::resolve_url;
|
||||||
use deno_core::serde::Deserialize;
|
use deno_core::serde::Deserialize;
|
||||||
use deno_core::serde_json;
|
use deno_core::serde_json;
|
||||||
use deno_core::serde_json::json;
|
use deno_core::serde_json::json;
|
||||||
|
use deno_core::task::spawn;
|
||||||
|
use deno_core::task::JoinHandle;
|
||||||
use deno_core::ModuleSpecifier;
|
use deno_core::ModuleSpecifier;
|
||||||
use deno_graph::Resolution;
|
use deno_graph::Resolution;
|
||||||
use deno_graph::ResolutionError;
|
use deno_graph::ResolutionError;
|
||||||
|
@ -197,9 +199,9 @@ impl DiagnosticsServer {
|
||||||
|
|
||||||
runtime.block_on(async {
|
runtime.block_on(async {
|
||||||
let mut token = CancellationToken::new();
|
let mut token = CancellationToken::new();
|
||||||
let mut ts_handle: Option<tokio::task::JoinHandle<()>> = None;
|
let mut ts_handle: Option<JoinHandle<()>> = None;
|
||||||
let mut lint_handle: Option<tokio::task::JoinHandle<()>> = None;
|
let mut lint_handle: Option<JoinHandle<()>> = None;
|
||||||
let mut deps_handle: Option<tokio::task::JoinHandle<()>> = None;
|
let mut deps_handle: Option<JoinHandle<()>> = None;
|
||||||
let diagnostics_publisher = DiagnosticsPublisher::new(client.clone());
|
let diagnostics_publisher = DiagnosticsPublisher::new(client.clone());
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -213,7 +215,7 @@ impl DiagnosticsServer {
|
||||||
diagnostics_publisher.clear().await;
|
diagnostics_publisher.clear().await;
|
||||||
|
|
||||||
let previous_ts_handle = ts_handle.take();
|
let previous_ts_handle = ts_handle.take();
|
||||||
ts_handle = Some(tokio::spawn({
|
ts_handle = Some(spawn({
|
||||||
let performance = performance.clone();
|
let performance = performance.clone();
|
||||||
let diagnostics_publisher = diagnostics_publisher.clone();
|
let diagnostics_publisher = diagnostics_publisher.clone();
|
||||||
let ts_server = ts_server.clone();
|
let ts_server = ts_server.clone();
|
||||||
|
@ -265,7 +267,7 @@ impl DiagnosticsServer {
|
||||||
}));
|
}));
|
||||||
|
|
||||||
let previous_deps_handle = deps_handle.take();
|
let previous_deps_handle = deps_handle.take();
|
||||||
deps_handle = Some(tokio::spawn({
|
deps_handle = Some(spawn({
|
||||||
let performance = performance.clone();
|
let performance = performance.clone();
|
||||||
let diagnostics_publisher = diagnostics_publisher.clone();
|
let diagnostics_publisher = diagnostics_publisher.clone();
|
||||||
let token = token.clone();
|
let token = token.clone();
|
||||||
|
@ -293,7 +295,7 @@ impl DiagnosticsServer {
|
||||||
}));
|
}));
|
||||||
|
|
||||||
let previous_lint_handle = lint_handle.take();
|
let previous_lint_handle = lint_handle.take();
|
||||||
lint_handle = Some(tokio::spawn({
|
lint_handle = Some(spawn({
|
||||||
let performance = performance.clone();
|
let performance = performance.clone();
|
||||||
let diagnostics_publisher = diagnostics_publisher.clone();
|
let diagnostics_publisher = diagnostics_publisher.clone();
|
||||||
let token = token.clone();
|
let token = token.clone();
|
||||||
|
|
|
@ -8,6 +8,7 @@ use deno_core::resolve_url;
|
||||||
use deno_core::serde_json;
|
use deno_core::serde_json;
|
||||||
use deno_core::serde_json::json;
|
use deno_core::serde_json::json;
|
||||||
use deno_core::serde_json::Value;
|
use deno_core::serde_json::Value;
|
||||||
|
use deno_core::task::spawn;
|
||||||
use deno_core::ModuleSpecifier;
|
use deno_core::ModuleSpecifier;
|
||||||
use deno_runtime::deno_fs;
|
use deno_runtime::deno_fs;
|
||||||
use deno_runtime::deno_node::NodeResolver;
|
use deno_runtime::deno_node::NodeResolver;
|
||||||
|
@ -240,7 +241,7 @@ impl LanguageServer {
|
||||||
let cli_options = result.cli_options;
|
let cli_options = result.cli_options;
|
||||||
let roots = result.roots;
|
let roots = result.roots;
|
||||||
let open_docs = result.open_docs;
|
let open_docs = result.open_docs;
|
||||||
let handle = tokio::task::spawn_local(async move {
|
let handle = spawn(async move {
|
||||||
create_graph_for_caching(cli_options, roots, open_docs).await
|
create_graph_for_caching(cli_options, roots, open_docs).await
|
||||||
});
|
});
|
||||||
if let Err(err) = handle.await.unwrap() {
|
if let Err(err) = handle.await.unwrap() {
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
use deno_core::task::spawn;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use tokio::time::Duration;
|
use tokio::time::Duration;
|
||||||
|
|
||||||
|
@ -7,7 +8,7 @@ use tokio::time::Duration;
|
||||||
/// provided process id. Once that process no longer exists
|
/// provided process id. Once that process no longer exists
|
||||||
/// it will terminate the current process.
|
/// it will terminate the current process.
|
||||||
pub fn start(parent_process_id: u32) {
|
pub fn start(parent_process_id: u32) {
|
||||||
tokio::task::spawn(async move {
|
spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
sleep(Duration::from_secs(30)).await;
|
sleep(Duration::from_secs(30)).await;
|
||||||
|
|
||||||
|
|
|
@ -24,9 +24,11 @@ use deno_core::futures::stream;
|
||||||
use deno_core::futures::StreamExt;
|
use deno_core::futures::StreamExt;
|
||||||
use deno_core::parking_lot::Mutex;
|
use deno_core::parking_lot::Mutex;
|
||||||
use deno_core::parking_lot::RwLock;
|
use deno_core::parking_lot::RwLock;
|
||||||
|
use deno_core::task::spawn;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_core::ModuleSpecifier;
|
use deno_core::ModuleSpecifier;
|
||||||
use deno_runtime::permissions::Permissions;
|
use deno_runtime::permissions::Permissions;
|
||||||
use deno_runtime::tokio_util::run_local;
|
use deno_runtime::tokio_util::create_and_run_current_thread;
|
||||||
use indexmap::IndexMap;
|
use indexmap::IndexMap;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
@ -284,7 +286,7 @@ impl TestRun {
|
||||||
};
|
};
|
||||||
let token = self.token.clone();
|
let token = self.token.clone();
|
||||||
|
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
if fail_fast_tracker.should_stop() {
|
if fail_fast_tracker.should_stop() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
@ -292,13 +294,13 @@ impl TestRun {
|
||||||
let file_result = if token.is_cancelled() {
|
let file_result = if token.is_cancelled() {
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
run_local(test::test_specifier(
|
create_and_run_current_thread(test::test_specifier(
|
||||||
&worker_factory,
|
worker_factory,
|
||||||
permissions,
|
permissions,
|
||||||
specifier,
|
specifier,
|
||||||
sender.clone(),
|
sender.clone(),
|
||||||
fail_fast_tracker,
|
fail_fast_tracker,
|
||||||
&test::TestSpecifierOptions {
|
test::TestSpecifierOptions {
|
||||||
filter,
|
filter,
|
||||||
shuffle: None,
|
shuffle: None,
|
||||||
trace_ops: false,
|
trace_ops: false,
|
||||||
|
@ -331,7 +333,7 @@ impl TestRun {
|
||||||
));
|
));
|
||||||
|
|
||||||
let handler = {
|
let handler = {
|
||||||
tokio::task::spawn(async move {
|
spawn(async move {
|
||||||
let earlier = Instant::now();
|
let earlier = Instant::now();
|
||||||
let mut summary = test::TestSummary::new();
|
let mut summary = test::TestSummary::new();
|
||||||
let mut used_only = false;
|
let mut used_only = false;
|
||||||
|
|
|
@ -46,7 +46,7 @@ use deno_core::error::AnyError;
|
||||||
use deno_core::error::JsError;
|
use deno_core::error::JsError;
|
||||||
use deno_runtime::colors;
|
use deno_runtime::colors;
|
||||||
use deno_runtime::fmt_errors::format_js_error;
|
use deno_runtime::fmt_errors::format_js_error;
|
||||||
use deno_runtime::tokio_util::run_local;
|
use deno_runtime::tokio_util::create_and_run_current_thread;
|
||||||
use factory::CliFactory;
|
use factory::CliFactory;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::env::current_exe;
|
use std::env::current_exe;
|
||||||
|
@ -294,7 +294,7 @@ pub fn main() {
|
||||||
run_subcommand(flags).await
|
run_subcommand(flags).await
|
||||||
};
|
};
|
||||||
|
|
||||||
let exit_code = unwrap_or_exit(run_local(future));
|
let exit_code = unwrap_or_exit(create_and_run_current_thread(future));
|
||||||
|
|
||||||
std::process::exit(exit_code);
|
std::process::exit(exit_code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ use async_trait::async_trait;
|
||||||
use deno_ast::ModuleSpecifier;
|
use deno_ast::ModuleSpecifier;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::futures;
|
use deno_core::futures;
|
||||||
|
use deno_core::task::spawn;
|
||||||
use deno_core::url::Url;
|
use deno_core::url::Url;
|
||||||
use deno_npm::NpmPackageId;
|
use deno_npm::NpmPackageId;
|
||||||
use deno_npm::NpmResolutionPackage;
|
use deno_npm::NpmResolutionPackage;
|
||||||
|
@ -71,7 +72,7 @@ pub async fn cache_packages(
|
||||||
assert_eq!(package.copy_index, 0); // the caller should not provide any of these
|
assert_eq!(package.copy_index, 0); // the caller should not provide any of these
|
||||||
let cache = cache.clone();
|
let cache = cache.clone();
|
||||||
let registry_url = registry_url.clone();
|
let registry_url = registry_url.clone();
|
||||||
let handle = tokio::task::spawn(async move {
|
let handle = spawn(async move {
|
||||||
cache
|
cache
|
||||||
.ensure_package(&package.pkg_id.nv, &package.dist, ®istry_url)
|
.ensure_package(&package.pkg_id.nv, &package.dist, ®istry_url)
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -18,6 +18,8 @@ use deno_ast::ModuleSpecifier;
|
||||||
use deno_core::anyhow::bail;
|
use deno_core::anyhow::bail;
|
||||||
use deno_core::anyhow::Context;
|
use deno_core::anyhow::Context;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
|
use deno_core::task::spawn;
|
||||||
|
use deno_core::task::JoinHandle;
|
||||||
use deno_core::url::Url;
|
use deno_core::url::Url;
|
||||||
use deno_npm::resolution::NpmResolutionSnapshot;
|
use deno_npm::resolution::NpmResolutionSnapshot;
|
||||||
use deno_npm::NpmPackageCacheFolderId;
|
use deno_npm::NpmPackageCacheFolderId;
|
||||||
|
@ -27,7 +29,6 @@ use deno_runtime::deno_fs;
|
||||||
use deno_runtime::deno_node::NodePermissions;
|
use deno_runtime::deno_node::NodePermissions;
|
||||||
use deno_runtime::deno_node::NodeResolutionMode;
|
use deno_runtime::deno_node::NodeResolutionMode;
|
||||||
use deno_runtime::deno_node::PackageJson;
|
use deno_runtime::deno_node::PackageJson;
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
|
|
||||||
use crate::npm::cache::mixed_case_package_name_encode;
|
use crate::npm::cache::mixed_case_package_name_encode;
|
||||||
use crate::npm::cache::should_sync_download;
|
use crate::npm::cache::should_sync_download;
|
||||||
|
@ -277,7 +278,7 @@ async fn sync_resolution_with_fs(
|
||||||
let cache = cache.clone();
|
let cache = cache.clone();
|
||||||
let registry_url = registry_url.clone();
|
let registry_url = registry_url.clone();
|
||||||
let package = package.clone();
|
let package = package.clone();
|
||||||
let handle = tokio::task::spawn(async move {
|
let handle = spawn(async move {
|
||||||
cache
|
cache
|
||||||
.ensure_package(&package.pkg_id.nv, &package.dist, ®istry_url)
|
.ensure_package(&package.pkg_id.nv, &package.dist, ®istry_url)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
|
@ -11,7 +11,6 @@ use std::process::Command;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use test_util as util;
|
use test_util as util;
|
||||||
use test_util::TempDir;
|
use test_util::TempDir;
|
||||||
use tokio::task::LocalSet;
|
|
||||||
use util::TestContext;
|
use util::TestContext;
|
||||||
|
|
||||||
itest_flaky!(cafile_url_imports {
|
itest_flaky!(cafile_url_imports {
|
||||||
|
@ -219,113 +218,99 @@ fn cafile_bundle_remote_exports() {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn listen_tls_alpn() {
|
async fn listen_tls_alpn() {
|
||||||
// TLS streams require the presence of an ambient local task set to gracefully
|
let mut child = util::deno_cmd()
|
||||||
// close dropped connections in the background.
|
.current_dir(util::testdata_path())
|
||||||
LocalSet::new()
|
.arg("run")
|
||||||
.run_until(async {
|
.arg("--unstable")
|
||||||
let mut child = util::deno_cmd()
|
.arg("--quiet")
|
||||||
.current_dir(util::testdata_path())
|
.arg("--allow-net")
|
||||||
.arg("run")
|
.arg("--allow-read")
|
||||||
.arg("--unstable")
|
.arg("./cert/listen_tls_alpn.ts")
|
||||||
.arg("--quiet")
|
.arg("4504")
|
||||||
.arg("--allow-net")
|
.stdout(std::process::Stdio::piped())
|
||||||
.arg("--allow-read")
|
.spawn()
|
||||||
.arg("./cert/listen_tls_alpn.ts")
|
.unwrap();
|
||||||
.arg("4504")
|
let stdout = child.stdout.as_mut().unwrap();
|
||||||
.stdout(std::process::Stdio::piped())
|
let mut msg = [0; 5];
|
||||||
.spawn()
|
let read = stdout.read(&mut msg).unwrap();
|
||||||
.unwrap();
|
assert_eq!(read, 5);
|
||||||
let stdout = child.stdout.as_mut().unwrap();
|
assert_eq!(&msg, b"READY");
|
||||||
let mut msg = [0; 5];
|
|
||||||
let read = stdout.read(&mut msg).unwrap();
|
|
||||||
assert_eq!(read, 5);
|
|
||||||
assert_eq!(&msg, b"READY");
|
|
||||||
|
|
||||||
let mut reader = &mut BufReader::new(Cursor::new(include_bytes!(
|
let mut reader = &mut BufReader::new(Cursor::new(include_bytes!(
|
||||||
"../testdata/tls/RootCA.crt"
|
"../testdata/tls/RootCA.crt"
|
||||||
)));
|
)));
|
||||||
let certs = rustls_pemfile::certs(&mut reader).unwrap();
|
let certs = rustls_pemfile::certs(&mut reader).unwrap();
|
||||||
let mut root_store = rustls::RootCertStore::empty();
|
let mut root_store = rustls::RootCertStore::empty();
|
||||||
root_store.add_parsable_certificates(&certs);
|
root_store.add_parsable_certificates(&certs);
|
||||||
let mut cfg = rustls::ClientConfig::builder()
|
let mut cfg = rustls::ClientConfig::builder()
|
||||||
.with_safe_defaults()
|
.with_safe_defaults()
|
||||||
.with_root_certificates(root_store)
|
.with_root_certificates(root_store)
|
||||||
.with_no_client_auth();
|
.with_no_client_auth();
|
||||||
cfg.alpn_protocols.push(b"foobar".to_vec());
|
cfg.alpn_protocols.push(b"foobar".to_vec());
|
||||||
let cfg = Arc::new(cfg);
|
let cfg = Arc::new(cfg);
|
||||||
|
|
||||||
let hostname = rustls::ServerName::try_from("localhost").unwrap();
|
let hostname = rustls::ServerName::try_from("localhost").unwrap();
|
||||||
|
|
||||||
let tcp_stream = tokio::net::TcpStream::connect("localhost:4504")
|
let tcp_stream = tokio::net::TcpStream::connect("localhost:4504")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut tls_stream =
|
let mut tls_stream = TlsStream::new_client_side(tcp_stream, cfg, hostname);
|
||||||
TlsStream::new_client_side(tcp_stream, cfg, hostname);
|
|
||||||
|
|
||||||
tls_stream.handshake().await.unwrap();
|
tls_stream.handshake().await.unwrap();
|
||||||
|
|
||||||
let (_, rustls_connection) = tls_stream.get_ref();
|
let (_, rustls_connection) = tls_stream.get_ref();
|
||||||
let alpn = rustls_connection.alpn_protocol().unwrap();
|
let alpn = rustls_connection.alpn_protocol().unwrap();
|
||||||
assert_eq!(alpn, b"foobar");
|
assert_eq!(alpn, b"foobar");
|
||||||
|
|
||||||
let status = child.wait().unwrap();
|
let status = child.wait().unwrap();
|
||||||
assert!(status.success());
|
assert!(status.success());
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn listen_tls_alpn_fail() {
|
async fn listen_tls_alpn_fail() {
|
||||||
// TLS streams require the presence of an ambient local task set to gracefully
|
let mut child = util::deno_cmd()
|
||||||
// close dropped connections in the background.
|
.current_dir(util::testdata_path())
|
||||||
LocalSet::new()
|
.arg("run")
|
||||||
.run_until(async {
|
.arg("--unstable")
|
||||||
let mut child = util::deno_cmd()
|
.arg("--quiet")
|
||||||
.current_dir(util::testdata_path())
|
.arg("--allow-net")
|
||||||
.arg("run")
|
.arg("--allow-read")
|
||||||
.arg("--unstable")
|
.arg("./cert/listen_tls_alpn_fail.ts")
|
||||||
.arg("--quiet")
|
.arg("4505")
|
||||||
.arg("--allow-net")
|
.stdout(std::process::Stdio::piped())
|
||||||
.arg("--allow-read")
|
.spawn()
|
||||||
.arg("./cert/listen_tls_alpn_fail.ts")
|
.unwrap();
|
||||||
.arg("4505")
|
let stdout = child.stdout.as_mut().unwrap();
|
||||||
.stdout(std::process::Stdio::piped())
|
let mut msg = [0; 5];
|
||||||
.spawn()
|
let read = stdout.read(&mut msg).unwrap();
|
||||||
.unwrap();
|
assert_eq!(read, 5);
|
||||||
let stdout = child.stdout.as_mut().unwrap();
|
assert_eq!(&msg, b"READY");
|
||||||
let mut msg = [0; 5];
|
|
||||||
let read = stdout.read(&mut msg).unwrap();
|
|
||||||
assert_eq!(read, 5);
|
|
||||||
assert_eq!(&msg, b"READY");
|
|
||||||
|
|
||||||
let mut reader = &mut BufReader::new(Cursor::new(include_bytes!(
|
let mut reader = &mut BufReader::new(Cursor::new(include_bytes!(
|
||||||
"../testdata/tls/RootCA.crt"
|
"../testdata/tls/RootCA.crt"
|
||||||
)));
|
)));
|
||||||
let certs = rustls_pemfile::certs(&mut reader).unwrap();
|
let certs = rustls_pemfile::certs(&mut reader).unwrap();
|
||||||
let mut root_store = rustls::RootCertStore::empty();
|
let mut root_store = rustls::RootCertStore::empty();
|
||||||
root_store.add_parsable_certificates(&certs);
|
root_store.add_parsable_certificates(&certs);
|
||||||
let mut cfg = rustls::ClientConfig::builder()
|
let mut cfg = rustls::ClientConfig::builder()
|
||||||
.with_safe_defaults()
|
.with_safe_defaults()
|
||||||
.with_root_certificates(root_store)
|
.with_root_certificates(root_store)
|
||||||
.with_no_client_auth();
|
.with_no_client_auth();
|
||||||
cfg.alpn_protocols.push(b"boofar".to_vec());
|
cfg.alpn_protocols.push(b"boofar".to_vec());
|
||||||
let cfg = Arc::new(cfg);
|
let cfg = Arc::new(cfg);
|
||||||
|
|
||||||
let hostname = rustls::ServerName::try_from("localhost").unwrap();
|
let hostname = rustls::ServerName::try_from("localhost").unwrap();
|
||||||
|
|
||||||
let tcp_stream = tokio::net::TcpStream::connect("localhost:4505")
|
let tcp_stream = tokio::net::TcpStream::connect("localhost:4505")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut tls_stream =
|
let mut tls_stream = TlsStream::new_client_side(tcp_stream, cfg, hostname);
|
||||||
TlsStream::new_client_side(tcp_stream, cfg, hostname);
|
|
||||||
|
|
||||||
tls_stream.handshake().await.unwrap_err();
|
tls_stream.handshake().await.unwrap_err();
|
||||||
|
|
||||||
let (_, rustls_connection) = tls_stream.get_ref();
|
let (_, rustls_connection) = tls_stream.get_ref();
|
||||||
assert!(rustls_connection.alpn_protocol().is_none());
|
assert!(rustls_connection.alpn_protocol().is_none());
|
||||||
|
|
||||||
let status = child.wait().unwrap();
|
let status = child.wait().unwrap();
|
||||||
assert!(status.success());
|
assert!(status.success());
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ where
|
||||||
Fut::Output: Send + 'static,
|
Fut::Output: Send + 'static,
|
||||||
{
|
{
|
||||||
fn execute(&self, fut: Fut) {
|
fn execute(&self, fut: Fut) {
|
||||||
tokio::task::spawn(fut);
|
deno_core::task::spawn(fut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,6 @@ use std::process::Stdio;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use test_util as util;
|
use test_util as util;
|
||||||
use test_util::TempDir;
|
use test_util::TempDir;
|
||||||
use tokio::task::LocalSet;
|
|
||||||
use trust_dns_client::serialize::txt::Lexer;
|
use trust_dns_client::serialize::txt::Lexer;
|
||||||
use trust_dns_client::serialize::txt::Parser;
|
use trust_dns_client::serialize::txt::Parser;
|
||||||
use util::assert_contains;
|
use util::assert_contains;
|
||||||
|
@ -3886,50 +3885,44 @@ async fn test_resolve_dns() {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn http2_request_url() {
|
async fn http2_request_url() {
|
||||||
// TLS streams require the presence of an ambient local task set to gracefully
|
let mut child = util::deno_cmd()
|
||||||
// close dropped connections in the background.
|
.current_dir(util::testdata_path())
|
||||||
LocalSet::new()
|
.arg("run")
|
||||||
.run_until(async {
|
.arg("--unstable")
|
||||||
let mut child = util::deno_cmd()
|
.arg("--quiet")
|
||||||
.current_dir(util::testdata_path())
|
.arg("--allow-net")
|
||||||
.arg("run")
|
.arg("--allow-read")
|
||||||
.arg("--unstable")
|
.arg("./run/http2_request_url.ts")
|
||||||
.arg("--quiet")
|
.arg("4506")
|
||||||
.arg("--allow-net")
|
.stdout(std::process::Stdio::piped())
|
||||||
.arg("--allow-read")
|
.spawn()
|
||||||
.arg("./run/http2_request_url.ts")
|
.unwrap();
|
||||||
.arg("4506")
|
let stdout = child.stdout.as_mut().unwrap();
|
||||||
.stdout(std::process::Stdio::piped())
|
let mut buffer = [0; 5];
|
||||||
.spawn()
|
let read = stdout.read(&mut buffer).unwrap();
|
||||||
.unwrap();
|
assert_eq!(read, 5);
|
||||||
let stdout = child.stdout.as_mut().unwrap();
|
let msg = std::str::from_utf8(&buffer).unwrap();
|
||||||
let mut buffer = [0; 5];
|
assert_eq!(msg, "READY");
|
||||||
let read = stdout.read(&mut buffer).unwrap();
|
|
||||||
assert_eq!(read, 5);
|
|
||||||
let msg = std::str::from_utf8(&buffer).unwrap();
|
|
||||||
assert_eq!(msg, "READY");
|
|
||||||
|
|
||||||
let cert = reqwest::Certificate::from_pem(include_bytes!(
|
let cert = reqwest::Certificate::from_pem(include_bytes!(
|
||||||
"../testdata/tls/RootCA.crt"
|
"../testdata/tls/RootCA.crt"
|
||||||
))
|
))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let client = reqwest::Client::builder()
|
let client = reqwest::Client::builder()
|
||||||
.add_root_certificate(cert)
|
.add_root_certificate(cert)
|
||||||
.http2_prior_knowledge()
|
.http2_prior_knowledge()
|
||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let res = client.get("http://127.0.0.1:4506").send().await.unwrap();
|
let res = client.get("http://127.0.0.1:4506").send().await.unwrap();
|
||||||
assert_eq!(200, res.status());
|
assert_eq!(200, res.status());
|
||||||
|
|
||||||
let body = res.text().await.unwrap();
|
let body = res.text().await.unwrap();
|
||||||
assert_eq!(body, "http://127.0.0.1:4506/");
|
assert_eq!(body, "http://127.0.0.1:4506/");
|
||||||
|
|
||||||
child.kill().unwrap();
|
child.kill().unwrap();
|
||||||
child.wait().unwrap();
|
child.wait().unwrap();
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(windows))]
|
#[cfg(not(windows))]
|
||||||
|
@ -4173,7 +4166,7 @@ where
|
||||||
Fut::Output: Send + 'static,
|
Fut::Output: Send + 'static,
|
||||||
{
|
{
|
||||||
fn execute(&self, fut: Fut) {
|
fn execute(&self, fut: Fut) {
|
||||||
tokio::task::spawn(fut);
|
deno_core::task::spawn(fut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,11 +27,13 @@ use deno_core::futures::FutureExt;
|
||||||
use deno_core::futures::StreamExt;
|
use deno_core::futures::StreamExt;
|
||||||
use deno_core::located_script_name;
|
use deno_core::located_script_name;
|
||||||
use deno_core::serde_v8;
|
use deno_core::serde_v8;
|
||||||
|
use deno_core::task::spawn;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_core::v8;
|
use deno_core::v8;
|
||||||
use deno_core::ModuleSpecifier;
|
use deno_core::ModuleSpecifier;
|
||||||
use deno_runtime::permissions::Permissions;
|
use deno_runtime::permissions::Permissions;
|
||||||
use deno_runtime::permissions::PermissionsContainer;
|
use deno_runtime::permissions::PermissionsContainer;
|
||||||
use deno_runtime::tokio_util::run_local;
|
use deno_runtime::tokio_util::create_and_run_current_thread;
|
||||||
use indexmap::IndexMap;
|
use indexmap::IndexMap;
|
||||||
use indexmap::IndexSet;
|
use indexmap::IndexSet;
|
||||||
use log::Level;
|
use log::Level;
|
||||||
|
@ -436,7 +438,7 @@ async fn check_specifiers(
|
||||||
|
|
||||||
/// Run a single specifier as an executable bench module.
|
/// Run a single specifier as an executable bench module.
|
||||||
async fn bench_specifier(
|
async fn bench_specifier(
|
||||||
worker_factory: &CliMainWorkerFactory,
|
worker_factory: Arc<CliMainWorkerFactory>,
|
||||||
permissions: Permissions,
|
permissions: Permissions,
|
||||||
specifier: ModuleSpecifier,
|
specifier: ModuleSpecifier,
|
||||||
sender: UnboundedSender<BenchEvent>,
|
sender: UnboundedSender<BenchEvent>,
|
||||||
|
@ -522,15 +524,15 @@ async fn bench_specifiers(
|
||||||
let specifier = specifier;
|
let specifier = specifier;
|
||||||
let sender = sender.clone();
|
let sender = sender.clone();
|
||||||
let options = option_for_handles.clone();
|
let options = option_for_handles.clone();
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
let future = bench_specifier(
|
let future = bench_specifier(
|
||||||
&worker_factory,
|
worker_factory,
|
||||||
permissions,
|
permissions,
|
||||||
specifier,
|
specifier,
|
||||||
sender,
|
sender,
|
||||||
options.filter,
|
options.filter,
|
||||||
);
|
);
|
||||||
run_local(future)
|
create_and_run_current_thread(future)
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -539,7 +541,7 @@ async fn bench_specifiers(
|
||||||
.collect::<Vec<Result<Result<(), AnyError>, tokio::task::JoinError>>>();
|
.collect::<Vec<Result<Result<(), AnyError>, tokio::task::JoinError>>>();
|
||||||
|
|
||||||
let handler = {
|
let handler = {
|
||||||
tokio::task::spawn(async move {
|
spawn(async move {
|
||||||
let mut used_only = false;
|
let mut used_only = false;
|
||||||
let mut report = BenchReport::new();
|
let mut report = BenchReport::new();
|
||||||
let mut reporter =
|
let mut reporter =
|
||||||
|
|
|
@ -28,6 +28,7 @@ use deno_core::error::generic_error;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::futures;
|
use deno_core::futures;
|
||||||
use deno_core::parking_lot::Mutex;
|
use deno_core::parking_lot::Mutex;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use log::info;
|
use log::info;
|
||||||
use log::warn;
|
use log::warn;
|
||||||
|
@ -629,7 +630,7 @@ where
|
||||||
let handles = file_paths.iter().map(|file_path| {
|
let handles = file_paths.iter().map(|file_path| {
|
||||||
let f = f.clone();
|
let f = f.clone();
|
||||||
let file_path = file_path.clone();
|
let file_path = file_path.clone();
|
||||||
tokio::task::spawn_blocking(move || f(file_path))
|
spawn_blocking(move || f(file_path))
|
||||||
});
|
});
|
||||||
let join_results = futures::future::join_all(handles).await;
|
let join_results = futures::future::join_all(handles).await;
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ use crate::factory::CliFactory;
|
||||||
use crate::file_fetcher::FileFetcher;
|
use crate::file_fetcher::FileFetcher;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::futures::StreamExt;
|
use deno_core::futures::StreamExt;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_runtime::permissions::Permissions;
|
use deno_runtime::permissions::Permissions;
|
||||||
use deno_runtime::permissions::PermissionsContainer;
|
use deno_runtime::permissions::PermissionsContainer;
|
||||||
use rustyline::error::ReadlineError;
|
use rustyline::error::ReadlineError;
|
||||||
|
@ -32,7 +33,7 @@ async fn read_line_and_poll(
|
||||||
editor: ReplEditor,
|
editor: ReplEditor,
|
||||||
) -> Result<String, ReadlineError> {
|
) -> Result<String, ReadlineError> {
|
||||||
#![allow(clippy::await_holding_refcell_ref)]
|
#![allow(clippy::await_holding_refcell_ref)]
|
||||||
let mut line_fut = tokio::task::spawn_blocking(move || editor.readline());
|
let mut line_fut = spawn_blocking(move || editor.readline());
|
||||||
let mut poll_worker = true;
|
let mut poll_worker = true;
|
||||||
let notifications_rc = repl_session.notifications.clone();
|
let notifications_rc = repl_session.notifications.clone();
|
||||||
let mut notifications = notifications_rc.borrow_mut();
|
let mut notifications = notifications_rc.borrow_mut();
|
||||||
|
|
|
@ -21,6 +21,7 @@ use indexmap::IndexMap;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
use tokio::task::LocalSet;
|
||||||
|
|
||||||
pub async fn execute_script(
|
pub async fn execute_script(
|
||||||
flags: Flags,
|
flags: Flags,
|
||||||
|
@ -59,9 +60,10 @@ pub async fn execute_script(
|
||||||
let seq_list = deno_task_shell::parser::parse(&script)
|
let seq_list = deno_task_shell::parser::parse(&script)
|
||||||
.with_context(|| format!("Error parsing script '{task_name}'."))?;
|
.with_context(|| format!("Error parsing script '{task_name}'."))?;
|
||||||
let env_vars = collect_env_vars();
|
let env_vars = collect_env_vars();
|
||||||
let exit_code =
|
let local = LocalSet::new();
|
||||||
deno_task_shell::execute(seq_list, env_vars, &cwd, Default::default())
|
let future =
|
||||||
.await;
|
deno_task_shell::execute(seq_list, env_vars, &cwd, Default::default());
|
||||||
|
let exit_code = local.run_until(future).await;
|
||||||
Ok(exit_code)
|
Ok(exit_code)
|
||||||
} else if let Some(script) = package_json_scripts.get(task_name) {
|
} else if let Some(script) = package_json_scripts.get(task_name) {
|
||||||
let package_json_deps_provider = factory.package_json_deps_provider();
|
let package_json_deps_provider = factory.package_json_deps_provider();
|
||||||
|
@ -109,8 +111,10 @@ pub async fn execute_script(
|
||||||
.with_context(|| format!("Error parsing script '{task_name}'."))?;
|
.with_context(|| format!("Error parsing script '{task_name}'."))?;
|
||||||
let npx_commands = resolve_npm_commands(npm_resolver, node_resolver)?;
|
let npx_commands = resolve_npm_commands(npm_resolver, node_resolver)?;
|
||||||
let env_vars = collect_env_vars();
|
let env_vars = collect_env_vars();
|
||||||
let exit_code =
|
let local = LocalSet::new();
|
||||||
deno_task_shell::execute(seq_list, env_vars, &cwd, npx_commands).await;
|
let future =
|
||||||
|
deno_task_shell::execute(seq_list, env_vars, &cwd, npx_commands);
|
||||||
|
let exit_code = local.run_until(future).await;
|
||||||
Ok(exit_code)
|
Ok(exit_code)
|
||||||
} else {
|
} else {
|
||||||
eprintln!("Task not found: {task_name}");
|
eprintln!("Task not found: {task_name}");
|
||||||
|
|
|
@ -34,6 +34,8 @@ use deno_core::futures::StreamExt;
|
||||||
use deno_core::located_script_name;
|
use deno_core::located_script_name;
|
||||||
use deno_core::parking_lot::Mutex;
|
use deno_core::parking_lot::Mutex;
|
||||||
use deno_core::serde_v8;
|
use deno_core::serde_v8;
|
||||||
|
use deno_core::task::spawn;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_core::url::Url;
|
use deno_core::url::Url;
|
||||||
use deno_core::v8;
|
use deno_core::v8;
|
||||||
use deno_core::ModuleSpecifier;
|
use deno_core::ModuleSpecifier;
|
||||||
|
@ -42,7 +44,7 @@ use deno_runtime::deno_io::StdioPipe;
|
||||||
use deno_runtime::fmt_errors::format_js_error;
|
use deno_runtime::fmt_errors::format_js_error;
|
||||||
use deno_runtime::permissions::Permissions;
|
use deno_runtime::permissions::Permissions;
|
||||||
use deno_runtime::permissions::PermissionsContainer;
|
use deno_runtime::permissions::PermissionsContainer;
|
||||||
use deno_runtime::tokio_util::run_local;
|
use deno_runtime::tokio_util::create_and_run_current_thread;
|
||||||
use indexmap::IndexMap;
|
use indexmap::IndexMap;
|
||||||
use indexmap::IndexSet;
|
use indexmap::IndexSet;
|
||||||
use log::Level;
|
use log::Level;
|
||||||
|
@ -916,12 +918,12 @@ pub fn format_test_error(js_error: &JsError) -> String {
|
||||||
/// Test a single specifier as documentation containing test programs, an executable test module or
|
/// Test a single specifier as documentation containing test programs, an executable test module or
|
||||||
/// both.
|
/// both.
|
||||||
pub async fn test_specifier(
|
pub async fn test_specifier(
|
||||||
worker_factory: &CliMainWorkerFactory,
|
worker_factory: Arc<CliMainWorkerFactory>,
|
||||||
permissions: Permissions,
|
permissions: Permissions,
|
||||||
specifier: ModuleSpecifier,
|
specifier: ModuleSpecifier,
|
||||||
mut sender: TestEventSender,
|
mut sender: TestEventSender,
|
||||||
fail_fast_tracker: FailFastTracker,
|
fail_fast_tracker: FailFastTracker,
|
||||||
options: &TestSpecifierOptions,
|
options: TestSpecifierOptions,
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
if fail_fast_tracker.should_stop() {
|
if fail_fast_tracker.should_stop() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
@ -1316,7 +1318,7 @@ async fn test_specifiers(
|
||||||
let concurrent_jobs = options.concurrent_jobs;
|
let concurrent_jobs = options.concurrent_jobs;
|
||||||
|
|
||||||
let sender_ = sender.downgrade();
|
let sender_ = sender.downgrade();
|
||||||
let sigint_handler_handle = tokio::task::spawn(async move {
|
let sigint_handler_handle = spawn(async move {
|
||||||
signal::ctrl_c().await.unwrap();
|
signal::ctrl_c().await.unwrap();
|
||||||
sender_.upgrade().map(|s| s.send(TestEvent::Sigint).ok());
|
sender_.upgrade().map(|s| s.send(TestEvent::Sigint).ok());
|
||||||
});
|
});
|
||||||
|
@ -1328,14 +1330,14 @@ async fn test_specifiers(
|
||||||
let sender = sender.clone();
|
let sender = sender.clone();
|
||||||
let fail_fast_tracker = FailFastTracker::new(options.fail_fast);
|
let fail_fast_tracker = FailFastTracker::new(options.fail_fast);
|
||||||
let specifier_options = options.specifier.clone();
|
let specifier_options = options.specifier.clone();
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
run_local(test_specifier(
|
create_and_run_current_thread(test_specifier(
|
||||||
&worker_factory,
|
worker_factory,
|
||||||
permissions,
|
permissions,
|
||||||
specifier,
|
specifier,
|
||||||
sender.clone(),
|
sender.clone(),
|
||||||
fail_fast_tracker,
|
fail_fast_tracker,
|
||||||
&specifier_options,
|
specifier_options,
|
||||||
))
|
))
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
@ -1350,7 +1352,7 @@ async fn test_specifiers(
|
||||||
));
|
));
|
||||||
|
|
||||||
let handler = {
|
let handler = {
|
||||||
tokio::task::spawn(async move {
|
spawn(async move {
|
||||||
let earlier = Instant::now();
|
let earlier = Instant::now();
|
||||||
let mut tests = IndexMap::new();
|
let mut tests = IndexMap::new();
|
||||||
let mut test_steps = IndexMap::new();
|
let mut test_steps = IndexMap::new();
|
||||||
|
@ -1887,7 +1889,7 @@ pub async fn run_tests_with_watch(
|
||||||
// run, a process-scoped basic exit handler is required due to a tokio
|
// run, a process-scoped basic exit handler is required due to a tokio
|
||||||
// limitation where it doesn't unbind its own handler for the entire process
|
// limitation where it doesn't unbind its own handler for the entire process
|
||||||
// once a user adds one.
|
// once a user adds one.
|
||||||
tokio::task::spawn(async move {
|
spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
signal::ctrl_c().await.unwrap();
|
signal::ctrl_c().await.unwrap();
|
||||||
if !HAS_TEST_RUN_SIGINT_HANDLER.load(Ordering::Relaxed) {
|
if !HAS_TEST_RUN_SIGINT_HANDLER.load(Ordering::Relaxed) {
|
||||||
|
@ -2070,7 +2072,7 @@ fn start_output_redirect_thread(
|
||||||
sender: UnboundedSender<TestEvent>,
|
sender: UnboundedSender<TestEvent>,
|
||||||
flush_state: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>,
|
flush_state: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>,
|
||||||
) {
|
) {
|
||||||
tokio::task::spawn_blocking(move || loop {
|
spawn_blocking(move || loop {
|
||||||
let mut buffer = [0; 512];
|
let mut buffer = [0; 512];
|
||||||
let size = match pipe_reader.read(&mut buffer) {
|
let size = match pipe_reader.read(&mut buffer) {
|
||||||
Ok(0) | Err(_) => break,
|
Ok(0) | Err(_) => break,
|
||||||
|
|
|
@ -17,6 +17,7 @@ use deno_core::anyhow::Context;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::futures::future::BoxFuture;
|
use deno_core::futures::future::BoxFuture;
|
||||||
use deno_core::futures::FutureExt;
|
use deno_core::futures::FutureExt;
|
||||||
|
use deno_core::task::spawn;
|
||||||
use deno_semver::Version;
|
use deno_semver::Version;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
@ -198,7 +199,7 @@ pub fn check_for_upgrades(
|
||||||
if update_checker.should_check_for_new_version() {
|
if update_checker.should_check_for_new_version() {
|
||||||
let env = update_checker.env.clone();
|
let env = update_checker.env.clone();
|
||||||
// do this asynchronously on a separate task
|
// do this asynchronously on a separate task
|
||||||
tokio::spawn(async move {
|
spawn(async move {
|
||||||
// Sleep for a small amount of time to not unnecessarily impact startup
|
// Sleep for a small amount of time to not unnecessarily impact startup
|
||||||
// time.
|
// time.
|
||||||
tokio::time::sleep(UPGRADE_CHECK_FETCH_DELAY).await;
|
tokio::time::sleep(UPGRADE_CHECK_FETCH_DELAY).await;
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
use console_static_text::ConsoleStaticText;
|
use console_static_text::ConsoleStaticText;
|
||||||
use deno_core::parking_lot::Mutex;
|
use deno_core::parking_lot::Mutex;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_runtime::ops::tty::ConsoleSize;
|
use deno_runtime::ops::tty::ConsoleSize;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -162,7 +163,7 @@ impl DrawThread {
|
||||||
internal_state.has_draw_thread = true;
|
internal_state.has_draw_thread = true;
|
||||||
|
|
||||||
let drawer_id = internal_state.drawer_id;
|
let drawer_id = internal_state.drawer_id;
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
let mut previous_size = console_size();
|
let mut previous_size = console_size();
|
||||||
loop {
|
loop {
|
||||||
let mut delay_ms = 120;
|
let mut delay_ms = 120;
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
use deno_core::anyhow::Context;
|
use deno_core::anyhow::Context;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
pub use deno_core::normalize_path;
|
pub use deno_core::normalize_path;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_core::ModuleSpecifier;
|
use deno_core::ModuleSpecifier;
|
||||||
use deno_runtime::deno_crypto::rand;
|
use deno_runtime::deno_crypto::rand;
|
||||||
use deno_runtime::deno_node::PathClean;
|
use deno_runtime::deno_node::PathClean;
|
||||||
|
@ -503,7 +504,7 @@ impl LaxSingleProcessFsFlag {
|
||||||
// This uses a blocking task because we use a single threaded
|
// This uses a blocking task because we use a single threaded
|
||||||
// runtime and this is time sensitive so we don't want it to update
|
// runtime and this is time sensitive so we don't want it to update
|
||||||
// at the whims of of whatever is occurring on the runtime thread.
|
// at the whims of of whatever is occurring on the runtime thread.
|
||||||
tokio::task::spawn_blocking({
|
spawn_blocking({
|
||||||
let token = token.clone();
|
let token = token.clone();
|
||||||
let last_updated_path = last_updated_path.clone();
|
let last_updated_path = last_updated_path.clone();
|
||||||
move || {
|
move || {
|
||||||
|
|
|
@ -36,6 +36,7 @@ serde_json = { workspace = true, features = ["preserve_order"] }
|
||||||
serde_v8.workspace = true
|
serde_v8.workspace = true
|
||||||
smallvec.workspace = true
|
smallvec.workspace = true
|
||||||
sourcemap = "6.1"
|
sourcemap = "6.1"
|
||||||
|
tokio.workspace = true
|
||||||
url.workspace = true
|
url.workspace = true
|
||||||
v8.workspace = true
|
v8.workspace = true
|
||||||
|
|
||||||
|
@ -46,4 +47,3 @@ path = "examples/http_bench_json_ops/main.rs"
|
||||||
# These dependencies are only used for the 'http_bench_*_ops' examples.
|
# These dependencies are only used for the 'http_bench_*_ops' examples.
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
deno_ast.workspace = true
|
deno_ast.workspace = true
|
||||||
tokio.workspace = true
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ mod resources;
|
||||||
mod runtime;
|
mod runtime;
|
||||||
pub mod snapshot_util;
|
pub mod snapshot_util;
|
||||||
mod source_map;
|
mod source_map;
|
||||||
|
pub mod task;
|
||||||
mod task_queue;
|
mod task_queue;
|
||||||
|
|
||||||
// Re-exports
|
// Re-exports
|
||||||
|
|
131
core/task.rs
Normal file
131
core/task.rs
Normal file
|
@ -0,0 +1,131 @@
|
||||||
|
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||||||
|
use core::pin::Pin;
|
||||||
|
use core::task::Context;
|
||||||
|
use core::task::Poll;
|
||||||
|
use futures::Future;
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
use tokio::runtime::Handle;
|
||||||
|
use tokio::runtime::RuntimeFlavor;
|
||||||
|
|
||||||
|
/// Equivalent to [`tokio::task::JoinHandle`].
|
||||||
|
#[repr(transparent)]
|
||||||
|
pub struct JoinHandle<R> {
|
||||||
|
handle: tokio::task::JoinHandle<MaskResultAsSend<R>>,
|
||||||
|
_r: PhantomData<R>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R> JoinHandle<R> {
|
||||||
|
/// Equivalent to [`tokio::task::JoinHandle::abort`].
|
||||||
|
pub fn abort(&self) {
|
||||||
|
self.handle.abort()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R> Future for JoinHandle<R> {
|
||||||
|
type Output = Result<R, tokio::task::JoinError>;
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Self::Output> {
|
||||||
|
// SAFETY: We are sure that handle is valid here
|
||||||
|
unsafe {
|
||||||
|
let me: &mut Self = Pin::into_inner_unchecked(self);
|
||||||
|
let handle = Pin::new_unchecked(&mut me.handle);
|
||||||
|
match handle.poll(cx) {
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
Poll::Ready(Ok(r)) => Poll::Ready(Ok(r.into_inner())),
|
||||||
|
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Equivalent to [`tokio::task::spawn`], but does not require the future to be [`Send`]. Must only be
|
||||||
|
/// used on a [`RuntimeFlavor::CurrentThread`] executor, though this is only checked when running with
|
||||||
|
/// debug assertions.
|
||||||
|
pub fn spawn<F: Future<Output = R> + 'static, R: 'static>(
|
||||||
|
f: F,
|
||||||
|
) -> JoinHandle<R> {
|
||||||
|
debug_assert!(
|
||||||
|
Handle::current().runtime_flavor() == RuntimeFlavor::CurrentThread
|
||||||
|
);
|
||||||
|
// SAFETY: we know this is a current-thread executor
|
||||||
|
let future = unsafe { MaskFutureAsSend::new(f) };
|
||||||
|
JoinHandle {
|
||||||
|
handle: tokio::task::spawn(future),
|
||||||
|
_r: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Equivalent to [`tokio::task::spawn_blocking`]. Currently a thin wrapper around the tokio API, but this
|
||||||
|
/// may change in the future.
|
||||||
|
pub fn spawn_blocking<
|
||||||
|
F: (FnOnce() -> R) + Send + 'static,
|
||||||
|
R: Send + 'static,
|
||||||
|
>(
|
||||||
|
f: F,
|
||||||
|
) -> JoinHandle<R> {
|
||||||
|
let handle = tokio::task::spawn_blocking(|| MaskResultAsSend { result: f() });
|
||||||
|
JoinHandle {
|
||||||
|
handle,
|
||||||
|
_r: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(transparent)]
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub struct MaskResultAsSend<R> {
|
||||||
|
result: R,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// SAFETY: We ensure that Send bounds are only faked when tokio is running on a current-thread executor
|
||||||
|
unsafe impl<R> Send for MaskResultAsSend<R> {}
|
||||||
|
|
||||||
|
impl<R> MaskResultAsSend<R> {
|
||||||
|
#[inline(always)]
|
||||||
|
pub fn into_inner(self) -> R {
|
||||||
|
self.result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct MaskFutureAsSend<F> {
|
||||||
|
future: F,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F> MaskFutureAsSend<F> {
|
||||||
|
/// Mark a non-`Send` future as `Send`. This is a trick to be able to use
|
||||||
|
/// `tokio::spawn()` (which requires `Send` futures) in a current thread
|
||||||
|
/// runtime.
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// You must ensure that the future is actually used on the same
|
||||||
|
/// thread, ie. always use current thread runtime flavor from Tokio.
|
||||||
|
pub unsafe fn new(future: F) -> Self {
|
||||||
|
Self { future }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SAFETY: we are cheating here - this struct is NOT really Send,
|
||||||
|
// but we need to mark it Send so that we can use `spawn()` in Tokio.
|
||||||
|
unsafe impl<F> Send for MaskFutureAsSend<F> {}
|
||||||
|
|
||||||
|
impl<F: Future> Future for MaskFutureAsSend<F> {
|
||||||
|
type Output = MaskResultAsSend<F::Output>;
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<MaskResultAsSend<F::Output>> {
|
||||||
|
// SAFETY: We are sure that future is valid here
|
||||||
|
unsafe {
|
||||||
|
let me: &mut MaskFutureAsSend<F> = Pin::into_inner_unchecked(self);
|
||||||
|
let future = Pin::new_unchecked(&mut me.future);
|
||||||
|
match future.poll(cx) {
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
Poll::Ready(result) => Poll::Ready(MaskResultAsSend { result }),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -127,7 +127,7 @@ mod tests {
|
||||||
for i in 0..100 {
|
for i in 0..100 {
|
||||||
let data = data.clone();
|
let data = data.clone();
|
||||||
tasks.push(task_queue.queue(async move {
|
tasks.push(task_queue.queue(async move {
|
||||||
tokio::task::spawn_blocking(move || {
|
crate::task::spawn_blocking(move || {
|
||||||
let mut data = data.lock();
|
let mut data = data.lock();
|
||||||
if *data != i {
|
if *data != i {
|
||||||
panic!("Value was not equal.");
|
panic!("Value was not equal.");
|
||||||
|
|
13
ext/cache/sqlite.rs
vendored
13
ext/cache/sqlite.rs
vendored
|
@ -10,6 +10,7 @@ use std::time::UNIX_EPOCH;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::parking_lot::Mutex;
|
use deno_core::parking_lot::Mutex;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_core::AsyncRefCell;
|
use deno_core::AsyncRefCell;
|
||||||
use deno_core::AsyncResult;
|
use deno_core::AsyncResult;
|
||||||
use deno_core::ByteString;
|
use deno_core::ByteString;
|
||||||
|
@ -99,7 +100,7 @@ impl Cache for SqliteBackedCache {
|
||||||
async fn storage_open(&self, cache_name: String) -> Result<i64, AnyError> {
|
async fn storage_open(&self, cache_name: String) -> Result<i64, AnyError> {
|
||||||
let db = self.connection.clone();
|
let db = self.connection.clone();
|
||||||
let cache_storage_dir = self.cache_storage_dir.clone();
|
let cache_storage_dir = self.cache_storage_dir.clone();
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
let db = db.lock();
|
let db = db.lock();
|
||||||
db.execute(
|
db.execute(
|
||||||
"INSERT OR IGNORE INTO cache_storage (cache_name) VALUES (?1)",
|
"INSERT OR IGNORE INTO cache_storage (cache_name) VALUES (?1)",
|
||||||
|
@ -124,7 +125,7 @@ impl Cache for SqliteBackedCache {
|
||||||
/// Note: this doesn't check the disk, it only checks the sqlite db.
|
/// Note: this doesn't check the disk, it only checks the sqlite db.
|
||||||
async fn storage_has(&self, cache_name: String) -> Result<bool, AnyError> {
|
async fn storage_has(&self, cache_name: String) -> Result<bool, AnyError> {
|
||||||
let db = self.connection.clone();
|
let db = self.connection.clone();
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
let db = db.lock();
|
let db = db.lock();
|
||||||
let cache_exists = db.query_row(
|
let cache_exists = db.query_row(
|
||||||
"SELECT count(id) FROM cache_storage WHERE cache_name = ?1",
|
"SELECT count(id) FROM cache_storage WHERE cache_name = ?1",
|
||||||
|
@ -143,7 +144,7 @@ impl Cache for SqliteBackedCache {
|
||||||
async fn storage_delete(&self, cache_name: String) -> Result<bool, AnyError> {
|
async fn storage_delete(&self, cache_name: String) -> Result<bool, AnyError> {
|
||||||
let db = self.connection.clone();
|
let db = self.connection.clone();
|
||||||
let cache_storage_dir = self.cache_storage_dir.clone();
|
let cache_storage_dir = self.cache_storage_dir.clone();
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
let db = db.lock();
|
let db = db.lock();
|
||||||
let maybe_cache_id = db
|
let maybe_cache_id = db
|
||||||
.query_row(
|
.query_row(
|
||||||
|
@ -210,7 +211,7 @@ impl Cache for SqliteBackedCache {
|
||||||
> {
|
> {
|
||||||
let db = self.connection.clone();
|
let db = self.connection.clone();
|
||||||
let cache_storage_dir = self.cache_storage_dir.clone();
|
let cache_storage_dir = self.cache_storage_dir.clone();
|
||||||
let query_result = tokio::task::spawn_blocking(move || {
|
let query_result = spawn_blocking(move || {
|
||||||
let db = db.lock();
|
let db = db.lock();
|
||||||
let result = db.query_row(
|
let result = db.query_row(
|
||||||
"SELECT response_body_key, response_headers, response_status, response_status_text, request_headers
|
"SELECT response_body_key, response_headers, response_status, response_status_text, request_headers
|
||||||
|
@ -269,7 +270,7 @@ impl Cache for SqliteBackedCache {
|
||||||
request: CacheDeleteRequest,
|
request: CacheDeleteRequest,
|
||||||
) -> Result<bool, AnyError> {
|
) -> Result<bool, AnyError> {
|
||||||
let db = self.connection.clone();
|
let db = self.connection.clone();
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
// TODO(@satyarohith): remove the response body from disk if one exists
|
// TODO(@satyarohith): remove the response body from disk if one exists
|
||||||
let db = db.lock();
|
let db = db.lock();
|
||||||
let rows_effected = db.execute(
|
let rows_effected = db.execute(
|
||||||
|
@ -287,7 +288,7 @@ async fn insert_cache_asset(
|
||||||
put: CachePutRequest,
|
put: CachePutRequest,
|
||||||
response_body_key: Option<String>,
|
response_body_key: Option<String>,
|
||||||
) -> Result<Option<String>, deno_core::anyhow::Error> {
|
) -> Result<Option<String>, deno_core::anyhow::Error> {
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
let maybe_response_body = {
|
let maybe_response_body = {
|
||||||
let db = db.lock();
|
let db = db.lock();
|
||||||
db.query_row(
|
db.query_row(
|
||||||
|
|
|
@ -20,6 +20,7 @@ use deno_core::error::custom_error;
|
||||||
use deno_core::error::type_error;
|
use deno_core::error::type_error;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::op;
|
use deno_core::op;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_core::ZeroCopyBuf;
|
use deno_core::ZeroCopyBuf;
|
||||||
use rsa::pkcs1::DecodeRsaPrivateKey;
|
use rsa::pkcs1::DecodeRsaPrivateKey;
|
||||||
use rsa::PaddingScheme;
|
use rsa::PaddingScheme;
|
||||||
|
@ -98,7 +99,7 @@ pub async fn op_crypto_decrypt(
|
||||||
tag_length,
|
tag_length,
|
||||||
} => decrypt_aes_gcm(key, length, tag_length, iv, additional_data, &data),
|
} => decrypt_aes_gcm(key, length, tag_length, iv, additional_data, &data),
|
||||||
};
|
};
|
||||||
let buf = tokio::task::spawn_blocking(fun).await.unwrap()?;
|
let buf = spawn_blocking(fun).await.unwrap()?;
|
||||||
Ok(buf.into())
|
Ok(buf.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ use ctr::Ctr64BE;
|
||||||
use deno_core::error::type_error;
|
use deno_core::error::type_error;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::op;
|
use deno_core::op;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_core::ZeroCopyBuf;
|
use deno_core::ZeroCopyBuf;
|
||||||
use rand::rngs::OsRng;
|
use rand::rngs::OsRng;
|
||||||
use rsa::pkcs1::DecodeRsaPublicKey;
|
use rsa::pkcs1::DecodeRsaPublicKey;
|
||||||
|
@ -99,7 +100,7 @@ pub async fn op_crypto_encrypt(
|
||||||
key_length,
|
key_length,
|
||||||
} => encrypt_aes_ctr(key, key_length, &counter, ctr_length, &data),
|
} => encrypt_aes_ctr(key, key_length, &counter, ctr_length, &data),
|
||||||
};
|
};
|
||||||
let buf = tokio::task::spawn_blocking(fun).await.unwrap()?;
|
let buf = spawn_blocking(fun).await.unwrap()?;
|
||||||
Ok(buf.into())
|
Ok(buf.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::op;
|
use deno_core::op;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_core::ZeroCopyBuf;
|
use deno_core::ZeroCopyBuf;
|
||||||
use elliptic_curve::rand_core::OsRng;
|
use elliptic_curve::rand_core::OsRng;
|
||||||
use num_traits::FromPrimitive;
|
use num_traits::FromPrimitive;
|
||||||
|
@ -56,7 +57,7 @@ pub async fn op_crypto_generate_key(
|
||||||
generate_key_hmac(hash, length)
|
generate_key_hmac(hash, length)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let buf = tokio::task::spawn_blocking(fun).await.unwrap()?;
|
let buf = spawn_blocking(fun).await.unwrap()?;
|
||||||
Ok(buf.into())
|
Ok(buf.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ use deno_core::error::type_error;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::op;
|
use deno_core::op;
|
||||||
|
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_core::OpState;
|
use deno_core::OpState;
|
||||||
use deno_core::ZeroCopyBuf;
|
use deno_core::ZeroCopyBuf;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
@ -601,7 +602,7 @@ pub async fn op_crypto_subtle_digest(
|
||||||
algorithm: CryptoHash,
|
algorithm: CryptoHash,
|
||||||
data: ZeroCopyBuf,
|
data: ZeroCopyBuf,
|
||||||
) -> Result<ZeroCopyBuf, AnyError> {
|
) -> Result<ZeroCopyBuf, AnyError> {
|
||||||
let output = tokio::task::spawn_blocking(move || {
|
let output = spawn_blocking(move || {
|
||||||
digest::digest(algorithm.into(), &data)
|
digest::digest(algorithm.into(), &data)
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.to_vec()
|
.to_vec()
|
||||||
|
|
|
@ -15,6 +15,7 @@ use deno_core::op;
|
||||||
use deno_core::serde_json::Value;
|
use deno_core::serde_json::Value;
|
||||||
use deno_core::serde_v8;
|
use deno_core::serde_v8;
|
||||||
use deno_core::serde_v8::ExternalPointer;
|
use deno_core::serde_v8::ExternalPointer;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_core::v8;
|
use deno_core::v8;
|
||||||
use deno_core::OpState;
|
use deno_core::OpState;
|
||||||
use deno_core::ResourceId;
|
use deno_core::ResourceId;
|
||||||
|
@ -298,7 +299,7 @@ where
|
||||||
.map(|v| v8::Local::<v8::TypedArray>::try_from(v.v8_value).unwrap());
|
.map(|v| v8::Local::<v8::TypedArray>::try_from(v.v8_value).unwrap());
|
||||||
let out_buffer_ptr = out_buffer_as_ptr(scope, out_buffer);
|
let out_buffer_ptr = out_buffer_as_ptr(scope, out_buffer);
|
||||||
|
|
||||||
let join_handle = tokio::task::spawn_blocking(move || {
|
let join_handle = spawn_blocking(move || {
|
||||||
let PtrSymbol { cif, ptr } = symbol.clone();
|
let PtrSymbol { cif, ptr } = symbol.clone();
|
||||||
ffi_call(
|
ffi_call(
|
||||||
call_args,
|
call_args,
|
||||||
|
@ -345,7 +346,7 @@ pub fn op_ffi_call_nonblocking<'scope>(
|
||||||
.map(|v| v8::Local::<v8::TypedArray>::try_from(v.v8_value).unwrap());
|
.map(|v| v8::Local::<v8::TypedArray>::try_from(v.v8_value).unwrap());
|
||||||
let out_buffer_ptr = out_buffer_as_ptr(scope, out_buffer);
|
let out_buffer_ptr = out_buffer_as_ptr(scope, out_buffer);
|
||||||
|
|
||||||
let join_handle = tokio::task::spawn_blocking(move || {
|
let join_handle = spawn_blocking(move || {
|
||||||
let Symbol {
|
let Symbol {
|
||||||
cif,
|
cif,
|
||||||
ptr,
|
ptr,
|
||||||
|
|
|
@ -9,6 +9,7 @@ use std::path::Path;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_io::fs::File;
|
use deno_io::fs::File;
|
||||||
use deno_io::fs::FsResult;
|
use deno_io::fs::FsResult;
|
||||||
use deno_io::fs::FsStat;
|
use deno_io::fs::FsStat;
|
||||||
|
@ -86,8 +87,7 @@ impl FileSystem for RealFs {
|
||||||
options: OpenOptions,
|
options: OpenOptions,
|
||||||
) -> FsResult<Rc<dyn File>> {
|
) -> FsResult<Rc<dyn File>> {
|
||||||
let opts = open_options(options);
|
let opts = open_options(options);
|
||||||
let std_file =
|
let std_file = spawn_blocking(move || opts.open(path)).await??;
|
||||||
tokio::task::spawn_blocking(move || opts.open(path)).await??;
|
|
||||||
Ok(Rc::new(StdFileResourceInner::file(std_file)))
|
Ok(Rc::new(StdFileResourceInner::file(std_file)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,14 +105,14 @@ impl FileSystem for RealFs {
|
||||||
recursive: bool,
|
recursive: bool,
|
||||||
mode: u32,
|
mode: u32,
|
||||||
) -> FsResult<()> {
|
) -> FsResult<()> {
|
||||||
tokio::task::spawn_blocking(move || mkdir(&path, recursive, mode)).await?
|
spawn_blocking(move || mkdir(&path, recursive, mode)).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn chmod_sync(&self, path: &Path, mode: u32) -> FsResult<()> {
|
fn chmod_sync(&self, path: &Path, mode: u32) -> FsResult<()> {
|
||||||
chmod(path, mode)
|
chmod(path, mode)
|
||||||
}
|
}
|
||||||
async fn chmod_async(&self, path: PathBuf, mode: u32) -> FsResult<()> {
|
async fn chmod_async(&self, path: PathBuf, mode: u32) -> FsResult<()> {
|
||||||
tokio::task::spawn_blocking(move || chmod(&path, mode)).await?
|
spawn_blocking(move || chmod(&path, mode)).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn chown_sync(
|
fn chown_sync(
|
||||||
|
@ -129,53 +129,49 @@ impl FileSystem for RealFs {
|
||||||
uid: Option<u32>,
|
uid: Option<u32>,
|
||||||
gid: Option<u32>,
|
gid: Option<u32>,
|
||||||
) -> FsResult<()> {
|
) -> FsResult<()> {
|
||||||
tokio::task::spawn_blocking(move || chown(&path, uid, gid)).await?
|
spawn_blocking(move || chown(&path, uid, gid)).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_sync(&self, path: &Path, recursive: bool) -> FsResult<()> {
|
fn remove_sync(&self, path: &Path, recursive: bool) -> FsResult<()> {
|
||||||
remove(path, recursive)
|
remove(path, recursive)
|
||||||
}
|
}
|
||||||
async fn remove_async(&self, path: PathBuf, recursive: bool) -> FsResult<()> {
|
async fn remove_async(&self, path: PathBuf, recursive: bool) -> FsResult<()> {
|
||||||
tokio::task::spawn_blocking(move || remove(&path, recursive)).await?
|
spawn_blocking(move || remove(&path, recursive)).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn copy_file_sync(&self, from: &Path, to: &Path) -> FsResult<()> {
|
fn copy_file_sync(&self, from: &Path, to: &Path) -> FsResult<()> {
|
||||||
copy_file(from, to)
|
copy_file(from, to)
|
||||||
}
|
}
|
||||||
async fn copy_file_async(&self, from: PathBuf, to: PathBuf) -> FsResult<()> {
|
async fn copy_file_async(&self, from: PathBuf, to: PathBuf) -> FsResult<()> {
|
||||||
tokio::task::spawn_blocking(move || copy_file(&from, &to)).await?
|
spawn_blocking(move || copy_file(&from, &to)).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stat_sync(&self, path: &Path) -> FsResult<FsStat> {
|
fn stat_sync(&self, path: &Path) -> FsResult<FsStat> {
|
||||||
stat(path).map(Into::into)
|
stat(path).map(Into::into)
|
||||||
}
|
}
|
||||||
async fn stat_async(&self, path: PathBuf) -> FsResult<FsStat> {
|
async fn stat_async(&self, path: PathBuf) -> FsResult<FsStat> {
|
||||||
tokio::task::spawn_blocking(move || stat(&path))
|
spawn_blocking(move || stat(&path)).await?.map(Into::into)
|
||||||
.await?
|
|
||||||
.map(Into::into)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn lstat_sync(&self, path: &Path) -> FsResult<FsStat> {
|
fn lstat_sync(&self, path: &Path) -> FsResult<FsStat> {
|
||||||
lstat(path).map(Into::into)
|
lstat(path).map(Into::into)
|
||||||
}
|
}
|
||||||
async fn lstat_async(&self, path: PathBuf) -> FsResult<FsStat> {
|
async fn lstat_async(&self, path: PathBuf) -> FsResult<FsStat> {
|
||||||
tokio::task::spawn_blocking(move || lstat(&path))
|
spawn_blocking(move || lstat(&path)).await?.map(Into::into)
|
||||||
.await?
|
|
||||||
.map(Into::into)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn realpath_sync(&self, path: &Path) -> FsResult<PathBuf> {
|
fn realpath_sync(&self, path: &Path) -> FsResult<PathBuf> {
|
||||||
realpath(path)
|
realpath(path)
|
||||||
}
|
}
|
||||||
async fn realpath_async(&self, path: PathBuf) -> FsResult<PathBuf> {
|
async fn realpath_async(&self, path: PathBuf) -> FsResult<PathBuf> {
|
||||||
tokio::task::spawn_blocking(move || realpath(&path)).await?
|
spawn_blocking(move || realpath(&path)).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_dir_sync(&self, path: &Path) -> FsResult<Vec<FsDirEntry>> {
|
fn read_dir_sync(&self, path: &Path) -> FsResult<Vec<FsDirEntry>> {
|
||||||
read_dir(path)
|
read_dir(path)
|
||||||
}
|
}
|
||||||
async fn read_dir_async(&self, path: PathBuf) -> FsResult<Vec<FsDirEntry>> {
|
async fn read_dir_async(&self, path: PathBuf) -> FsResult<Vec<FsDirEntry>> {
|
||||||
tokio::task::spawn_blocking(move || read_dir(&path)).await?
|
spawn_blocking(move || read_dir(&path)).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn rename_sync(&self, oldpath: &Path, newpath: &Path) -> FsResult<()> {
|
fn rename_sync(&self, oldpath: &Path, newpath: &Path) -> FsResult<()> {
|
||||||
|
@ -186,7 +182,7 @@ impl FileSystem for RealFs {
|
||||||
oldpath: PathBuf,
|
oldpath: PathBuf,
|
||||||
newpath: PathBuf,
|
newpath: PathBuf,
|
||||||
) -> FsResult<()> {
|
) -> FsResult<()> {
|
||||||
tokio::task::spawn_blocking(move || fs::rename(oldpath, newpath))
|
spawn_blocking(move || fs::rename(oldpath, newpath))
|
||||||
.await?
|
.await?
|
||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
@ -199,7 +195,7 @@ impl FileSystem for RealFs {
|
||||||
oldpath: PathBuf,
|
oldpath: PathBuf,
|
||||||
newpath: PathBuf,
|
newpath: PathBuf,
|
||||||
) -> FsResult<()> {
|
) -> FsResult<()> {
|
||||||
tokio::task::spawn_blocking(move || fs::hard_link(oldpath, newpath))
|
spawn_blocking(move || fs::hard_link(oldpath, newpath))
|
||||||
.await?
|
.await?
|
||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
@ -218,15 +214,14 @@ impl FileSystem for RealFs {
|
||||||
newpath: PathBuf,
|
newpath: PathBuf,
|
||||||
file_type: Option<FsFileType>,
|
file_type: Option<FsFileType>,
|
||||||
) -> FsResult<()> {
|
) -> FsResult<()> {
|
||||||
tokio::task::spawn_blocking(move || symlink(&oldpath, &newpath, file_type))
|
spawn_blocking(move || symlink(&oldpath, &newpath, file_type)).await?
|
||||||
.await?
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_link_sync(&self, path: &Path) -> FsResult<PathBuf> {
|
fn read_link_sync(&self, path: &Path) -> FsResult<PathBuf> {
|
||||||
fs::read_link(path).map_err(Into::into)
|
fs::read_link(path).map_err(Into::into)
|
||||||
}
|
}
|
||||||
async fn read_link_async(&self, path: PathBuf) -> FsResult<PathBuf> {
|
async fn read_link_async(&self, path: PathBuf) -> FsResult<PathBuf> {
|
||||||
tokio::task::spawn_blocking(move || fs::read_link(path))
|
spawn_blocking(move || fs::read_link(path))
|
||||||
.await?
|
.await?
|
||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
@ -235,7 +230,7 @@ impl FileSystem for RealFs {
|
||||||
truncate(path, len)
|
truncate(path, len)
|
||||||
}
|
}
|
||||||
async fn truncate_async(&self, path: PathBuf, len: u64) -> FsResult<()> {
|
async fn truncate_async(&self, path: PathBuf, len: u64) -> FsResult<()> {
|
||||||
tokio::task::spawn_blocking(move || truncate(&path, len)).await?
|
spawn_blocking(move || truncate(&path, len)).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn utime_sync(
|
fn utime_sync(
|
||||||
|
@ -260,7 +255,7 @@ impl FileSystem for RealFs {
|
||||||
) -> FsResult<()> {
|
) -> FsResult<()> {
|
||||||
let atime = filetime::FileTime::from_unix_time(atime_secs, atime_nanos);
|
let atime = filetime::FileTime::from_unix_time(atime_secs, atime_nanos);
|
||||||
let mtime = filetime::FileTime::from_unix_time(mtime_secs, mtime_nanos);
|
let mtime = filetime::FileTime::from_unix_time(mtime_secs, mtime_nanos);
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
filetime::set_file_times(path, atime, mtime).map_err(Into::into)
|
filetime::set_file_times(path, atime, mtime).map_err(Into::into)
|
||||||
})
|
})
|
||||||
.await?
|
.await?
|
||||||
|
@ -289,7 +284,7 @@ impl FileSystem for RealFs {
|
||||||
options: OpenOptions,
|
options: OpenOptions,
|
||||||
data: Vec<u8>,
|
data: Vec<u8>,
|
||||||
) -> FsResult<()> {
|
) -> FsResult<()> {
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
let opts = open_options(options);
|
let opts = open_options(options);
|
||||||
let mut file = opts.open(path)?;
|
let mut file = opts.open(path)?;
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
|
@ -307,7 +302,7 @@ impl FileSystem for RealFs {
|
||||||
fs::read(path).map_err(Into::into)
|
fs::read(path).map_err(Into::into)
|
||||||
}
|
}
|
||||||
async fn read_file_async(&self, path: PathBuf) -> FsResult<Vec<u8>> {
|
async fn read_file_async(&self, path: PathBuf) -> FsResult<Vec<u8>> {
|
||||||
tokio::task::spawn_blocking(move || fs::read(path))
|
spawn_blocking(move || fs::read(path))
|
||||||
.await?
|
.await?
|
||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,8 @@ use cache_control::CacheControl;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::futures::TryFutureExt;
|
use deno_core::futures::TryFutureExt;
|
||||||
use deno_core::op;
|
use deno_core::op;
|
||||||
|
use deno_core::task::spawn;
|
||||||
|
use deno_core::task::JoinHandle;
|
||||||
use deno_core::AsyncRefCell;
|
use deno_core::AsyncRefCell;
|
||||||
use deno_core::AsyncResult;
|
use deno_core::AsyncResult;
|
||||||
use deno_core::ByteString;
|
use deno_core::ByteString;
|
||||||
|
@ -68,9 +70,6 @@ use std::rc::Rc;
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
use tokio::task::spawn_local;
|
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
|
|
||||||
type Request = hyper1::Request<Incoming>;
|
type Request = hyper1::Request<Incoming>;
|
||||||
type Response = hyper1::Response<ResponseBytes>;
|
type Response = hyper1::Response<ResponseBytes>;
|
||||||
|
|
||||||
|
@ -262,7 +261,7 @@ pub fn op_http_upgrade_raw(
|
||||||
let (read_rx, write_tx) = tokio::io::split(read);
|
let (read_rx, write_tx) = tokio::io::split(read);
|
||||||
let (mut write_rx, mut read_tx) = tokio::io::split(write);
|
let (mut write_rx, mut read_tx) = tokio::io::split(write);
|
||||||
|
|
||||||
spawn_local(async move {
|
spawn(async move {
|
||||||
let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default();
|
let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default();
|
||||||
|
|
||||||
// Stage 2: Extract the Upgraded connection
|
// Stage 2: Extract the Upgraded connection
|
||||||
|
@ -285,7 +284,7 @@ pub fn op_http_upgrade_raw(
|
||||||
// Stage 3: Pump the data
|
// Stage 3: Pump the data
|
||||||
let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded);
|
let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded);
|
||||||
|
|
||||||
spawn_local(async move {
|
spawn(async move {
|
||||||
let mut buf = [0; 1024];
|
let mut buf = [0; 1024];
|
||||||
loop {
|
loop {
|
||||||
let read = upgraded_rx.read(&mut buf).await?;
|
let read = upgraded_rx.read(&mut buf).await?;
|
||||||
|
@ -296,7 +295,7 @@ pub fn op_http_upgrade_raw(
|
||||||
}
|
}
|
||||||
Ok::<_, AnyError>(())
|
Ok::<_, AnyError>(())
|
||||||
});
|
});
|
||||||
spawn_local(async move {
|
spawn(async move {
|
||||||
let mut buf = [0; 1024];
|
let mut buf = [0; 1024];
|
||||||
loop {
|
loop {
|
||||||
let read = write_rx.read(&mut buf).await?;
|
let read = write_rx.read(&mut buf).await?;
|
||||||
|
@ -792,11 +791,10 @@ fn serve_https(
|
||||||
cancel: Rc<CancelHandle>,
|
cancel: Rc<CancelHandle>,
|
||||||
tx: tokio::sync::mpsc::Sender<u32>,
|
tx: tokio::sync::mpsc::Sender<u32>,
|
||||||
) -> JoinHandle<Result<(), AnyError>> {
|
) -> JoinHandle<Result<(), AnyError>> {
|
||||||
// TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
|
|
||||||
let svc = service_fn(move |req: Request| {
|
let svc = service_fn(move |req: Request| {
|
||||||
new_slab_future(req, request_info.clone(), tx.clone())
|
new_slab_future(req, request_info.clone(), tx.clone())
|
||||||
});
|
});
|
||||||
spawn_local(
|
spawn(
|
||||||
async {
|
async {
|
||||||
io.handshake().await?;
|
io.handshake().await?;
|
||||||
// If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect
|
// If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect
|
||||||
|
@ -820,11 +818,10 @@ fn serve_http(
|
||||||
cancel: Rc<CancelHandle>,
|
cancel: Rc<CancelHandle>,
|
||||||
tx: tokio::sync::mpsc::Sender<u32>,
|
tx: tokio::sync::mpsc::Sender<u32>,
|
||||||
) -> JoinHandle<Result<(), AnyError>> {
|
) -> JoinHandle<Result<(), AnyError>> {
|
||||||
// TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
|
|
||||||
let svc = service_fn(move |req: Request| {
|
let svc = service_fn(move |req: Request| {
|
||||||
new_slab_future(req, request_info.clone(), tx.clone())
|
new_slab_future(req, request_info.clone(), tx.clone())
|
||||||
});
|
});
|
||||||
spawn_local(serve_http2_autodetect(io, svc).try_or_cancel(cancel))
|
spawn(serve_http2_autodetect(io, svc).try_or_cancel(cancel))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn serve_http_on<HTTP>(
|
fn serve_http_on<HTTP>(
|
||||||
|
@ -916,7 +913,7 @@ where
|
||||||
let cancel_clone = resource.cancel_handle();
|
let cancel_clone = resource.cancel_handle();
|
||||||
|
|
||||||
let listen_properties_clone: HttpListenProperties = listen_properties.clone();
|
let listen_properties_clone: HttpListenProperties = listen_properties.clone();
|
||||||
let handle = spawn_local(async move {
|
let handle = spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
let conn = listener
|
let conn = listener
|
||||||
.accept()
|
.accept()
|
||||||
|
|
|
@ -20,6 +20,7 @@ use deno_core::futures::FutureExt;
|
||||||
use deno_core::futures::StreamExt;
|
use deno_core::futures::StreamExt;
|
||||||
use deno_core::futures::TryFutureExt;
|
use deno_core::futures::TryFutureExt;
|
||||||
use deno_core::op;
|
use deno_core::op;
|
||||||
|
use deno_core::task::spawn;
|
||||||
use deno_core::AsyncRefCell;
|
use deno_core::AsyncRefCell;
|
||||||
use deno_core::AsyncResult;
|
use deno_core::AsyncResult;
|
||||||
use deno_core::BufView;
|
use deno_core::BufView;
|
||||||
|
@ -68,7 +69,6 @@ use std::task::Poll;
|
||||||
use tokio::io::AsyncRead;
|
use tokio::io::AsyncRead;
|
||||||
use tokio::io::AsyncWrite;
|
use tokio::io::AsyncWrite;
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
use tokio::task::spawn_local;
|
|
||||||
|
|
||||||
use crate::network_buffered_stream::NetworkBufferedStream;
|
use crate::network_buffered_stream::NetworkBufferedStream;
|
||||||
use crate::reader_stream::ExternallyAbortableReaderStream;
|
use crate::reader_stream::ExternallyAbortableReaderStream;
|
||||||
|
@ -184,7 +184,7 @@ impl HttpConnResource {
|
||||||
};
|
};
|
||||||
let (task_fut, closed_fut) = task_fut.remote_handle();
|
let (task_fut, closed_fut) = task_fut.remote_handle();
|
||||||
let closed_fut = closed_fut.shared();
|
let closed_fut = closed_fut.shared();
|
||||||
spawn_local(task_fut);
|
spawn(task_fut);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
addr,
|
addr,
|
||||||
|
@ -1005,7 +1005,7 @@ where
|
||||||
Fut::Output: 'static,
|
Fut::Output: 'static,
|
||||||
{
|
{
|
||||||
fn execute(&self, fut: Fut) {
|
fn execute(&self, fut: Fut) {
|
||||||
spawn_local(fut);
|
deno_core::task::spawn(fut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1015,7 +1015,7 @@ where
|
||||||
Fut::Output: 'static,
|
Fut::Output: 'static,
|
||||||
{
|
{
|
||||||
fn execute(&self, fut: Fut) {
|
fn execute(&self, fut: Fut) {
|
||||||
spawn_local(fut);
|
deno_core::task::spawn(fut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::op;
|
use deno_core::op;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_core::AsyncMutFuture;
|
use deno_core::AsyncMutFuture;
|
||||||
use deno_core::AsyncRefCell;
|
use deno_core::AsyncRefCell;
|
||||||
use deno_core::AsyncResult;
|
use deno_core::AsyncResult;
|
||||||
|
@ -350,7 +351,7 @@ impl StdFileResourceInner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let (cell_value, result) = tokio::task::spawn_blocking(move || {
|
let (cell_value, result) = spawn_blocking(move || {
|
||||||
let result = action(&mut cell_value);
|
let result = action(&mut cell_value);
|
||||||
(cell_value, result)
|
(cell_value, result)
|
||||||
})
|
})
|
||||||
|
@ -372,7 +373,7 @@ impl StdFileResourceInner {
|
||||||
// we want to restrict this to one async action at a time
|
// we want to restrict this to one async action at a time
|
||||||
let _permit = self.cell_async_task_queue.acquire().await;
|
let _permit = self.cell_async_task_queue.acquire().await;
|
||||||
|
|
||||||
tokio::task::spawn_blocking(action).await.unwrap()
|
spawn_blocking(action).await.unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ use deno_core::futures::task::Waker;
|
||||||
use deno_core::op;
|
use deno_core::op;
|
||||||
|
|
||||||
use deno_core::parking_lot::Mutex;
|
use deno_core::parking_lot::Mutex;
|
||||||
|
use deno_core::task::spawn;
|
||||||
use deno_core::AsyncRefCell;
|
use deno_core::AsyncRefCell;
|
||||||
use deno_core::AsyncResult;
|
use deno_core::AsyncResult;
|
||||||
use deno_core::ByteString;
|
use deno_core::ByteString;
|
||||||
|
@ -74,7 +75,6 @@ use tokio::io::AsyncWriteExt;
|
||||||
use tokio::io::ReadBuf;
|
use tokio::io::ReadBuf;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::task::spawn_local;
|
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
||||||
enum Flow {
|
enum Flow {
|
||||||
|
@ -224,9 +224,9 @@ impl Drop for TlsStream {
|
||||||
let use_linger_task = inner.poll_close(&mut cx).is_pending();
|
let use_linger_task = inner.poll_close(&mut cx).is_pending();
|
||||||
|
|
||||||
if use_linger_task {
|
if use_linger_task {
|
||||||
spawn_local(poll_fn(move |cx| inner.poll_close(cx)));
|
spawn(poll_fn(move |cx| inner.poll_close(cx)));
|
||||||
} else if cfg!(debug_assertions) {
|
} else if cfg!(debug_assertions) {
|
||||||
spawn_local(async {}); // Spawn dummy task to detect missing LocalSet.
|
spawn(async {}); // Spawn dummy task to detect missing runtime.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ use deno_core::error::type_error;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::op;
|
use deno_core::op;
|
||||||
use deno_core::serde_v8;
|
use deno_core::serde_v8;
|
||||||
|
use deno_core::task::spawn_blocking;
|
||||||
use deno_core::OpState;
|
use deno_core::OpState;
|
||||||
use deno_core::ResourceId;
|
use deno_core::ResourceId;
|
||||||
use deno_core::StringOrBuffer;
|
use deno_core::StringOrBuffer;
|
||||||
|
@ -57,12 +58,7 @@ pub async fn op_node_check_prime_async(
|
||||||
checks: usize,
|
checks: usize,
|
||||||
) -> Result<bool, AnyError> {
|
) -> Result<bool, AnyError> {
|
||||||
// TODO(@littledivy): use rayon for CPU-bound tasks
|
// TODO(@littledivy): use rayon for CPU-bound tasks
|
||||||
Ok(
|
Ok(spawn_blocking(move || primes::is_probably_prime(&num, checks)).await?)
|
||||||
tokio::task::spawn_blocking(move || {
|
|
||||||
primes::is_probably_prime(&num, checks)
|
|
||||||
})
|
|
||||||
.await?,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op]
|
||||||
|
@ -74,10 +70,8 @@ pub fn op_node_check_prime_bytes_async(
|
||||||
// TODO(@littledivy): use rayon for CPU-bound tasks
|
// TODO(@littledivy): use rayon for CPU-bound tasks
|
||||||
Ok(async move {
|
Ok(async move {
|
||||||
Ok(
|
Ok(
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || primes::is_probably_prime(&candidate, checks))
|
||||||
primes::is_probably_prime(&candidate, checks)
|
.await?,
|
||||||
})
|
|
||||||
.await?,
|
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -462,7 +456,7 @@ pub async fn op_node_pbkdf2_async(
|
||||||
digest: String,
|
digest: String,
|
||||||
keylen: usize,
|
keylen: usize,
|
||||||
) -> Result<ZeroCopyBuf, AnyError> {
|
) -> Result<ZeroCopyBuf, AnyError> {
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
let mut derived_key = vec![0; keylen];
|
let mut derived_key = vec![0; keylen];
|
||||||
pbkdf2_sync(&password, &salt, iterations, &digest, &mut derived_key)
|
pbkdf2_sync(&password, &salt, iterations, &digest, &mut derived_key)
|
||||||
.map(|_| derived_key.into())
|
.map(|_| derived_key.into())
|
||||||
|
@ -477,7 +471,7 @@ pub fn op_node_generate_secret(buf: &mut [u8]) {
|
||||||
|
|
||||||
#[op]
|
#[op]
|
||||||
pub async fn op_node_generate_secret_async(len: i32) -> ZeroCopyBuf {
|
pub async fn op_node_generate_secret_async(len: i32) -> ZeroCopyBuf {
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
let mut buf = vec![0u8; len as usize];
|
let mut buf = vec![0u8; len as usize];
|
||||||
rand::thread_rng().fill(&mut buf[..]);
|
rand::thread_rng().fill(&mut buf[..]);
|
||||||
buf.into()
|
buf.into()
|
||||||
|
@ -535,7 +529,7 @@ pub async fn op_node_hkdf_async(
|
||||||
info: ZeroCopyBuf,
|
info: ZeroCopyBuf,
|
||||||
okm_len: usize,
|
okm_len: usize,
|
||||||
) -> Result<ZeroCopyBuf, AnyError> {
|
) -> Result<ZeroCopyBuf, AnyError> {
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
let mut okm = vec![0u8; okm_len];
|
let mut okm = vec![0u8; okm_len];
|
||||||
hkdf_sync(&hash, &ikm, &salt, &info, &mut okm)?;
|
hkdf_sync(&hash, &ikm, &salt, &info, &mut okm)?;
|
||||||
Ok(okm.into())
|
Ok(okm.into())
|
||||||
|
@ -578,10 +572,7 @@ pub async fn op_node_generate_rsa_async(
|
||||||
modulus_length: usize,
|
modulus_length: usize,
|
||||||
public_exponent: usize,
|
public_exponent: usize,
|
||||||
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || generate_rsa(modulus_length, public_exponent)).await?
|
||||||
generate_rsa(modulus_length, public_exponent)
|
|
||||||
})
|
|
||||||
.await?
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dsa_generate(
|
fn dsa_generate(
|
||||||
|
@ -635,10 +626,7 @@ pub async fn op_node_dsa_generate_async(
|
||||||
modulus_length: usize,
|
modulus_length: usize,
|
||||||
divisor_length: usize,
|
divisor_length: usize,
|
||||||
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || dsa_generate(modulus_length, divisor_length)).await?
|
||||||
dsa_generate(modulus_length, divisor_length)
|
|
||||||
})
|
|
||||||
.await?
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ec_generate(
|
fn ec_generate(
|
||||||
|
@ -677,7 +665,7 @@ pub fn op_node_ec_generate(
|
||||||
pub async fn op_node_ec_generate_async(
|
pub async fn op_node_ec_generate_async(
|
||||||
named_curve: String,
|
named_curve: String,
|
||||||
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
||||||
tokio::task::spawn_blocking(move || ec_generate(&named_curve)).await?
|
spawn_blocking(move || ec_generate(&named_curve)).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ed25519_generate() -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
fn ed25519_generate() -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
||||||
|
@ -704,7 +692,7 @@ pub fn op_node_ed25519_generate() -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError
|
||||||
#[op]
|
#[op]
|
||||||
pub async fn op_node_ed25519_generate_async(
|
pub async fn op_node_ed25519_generate_async(
|
||||||
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
||||||
tokio::task::spawn_blocking(ed25519_generate).await?
|
spawn_blocking(ed25519_generate).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn x25519_generate() -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
fn x25519_generate() -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
||||||
|
@ -739,7 +727,7 @@ pub fn op_node_x25519_generate() -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError>
|
||||||
#[op]
|
#[op]
|
||||||
pub async fn op_node_x25519_generate_async(
|
pub async fn op_node_x25519_generate_async(
|
||||||
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
||||||
tokio::task::spawn_blocking(x25519_generate).await?
|
spawn_blocking(x25519_generate).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dh_generate_group(
|
fn dh_generate_group(
|
||||||
|
@ -772,7 +760,7 @@ pub fn op_node_dh_generate_group(
|
||||||
pub async fn op_node_dh_generate_group_async(
|
pub async fn op_node_dh_generate_group_async(
|
||||||
group_name: String,
|
group_name: String,
|
||||||
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
||||||
tokio::task::spawn_blocking(move || dh_generate_group(&group_name)).await?
|
spawn_blocking(move || dh_generate_group(&group_name)).await?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dh_generate(
|
fn dh_generate(
|
||||||
|
@ -806,10 +794,8 @@ pub async fn op_node_dh_generate_async(
|
||||||
prime_len: usize,
|
prime_len: usize,
|
||||||
generator: usize,
|
generator: usize,
|
||||||
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
) -> Result<(ZeroCopyBuf, ZeroCopyBuf), AnyError> {
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || dh_generate(prime.as_deref(), prime_len, generator))
|
||||||
dh_generate(prime.as_deref(), prime_len, generator)
|
.await?
|
||||||
})
|
|
||||||
.await?
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op]
|
||||||
|
@ -885,7 +871,7 @@ pub async fn op_node_scrypt_async(
|
||||||
parallelization: u32,
|
parallelization: u32,
|
||||||
maxmem: u32,
|
maxmem: u32,
|
||||||
) -> Result<ZeroCopyBuf, AnyError> {
|
) -> Result<ZeroCopyBuf, AnyError> {
|
||||||
tokio::task::spawn_blocking(move || {
|
spawn_blocking(move || {
|
||||||
let mut output_buffer = vec![0u8; keylen as usize];
|
let mut output_buffer = vec![0u8; keylen as usize];
|
||||||
let res = scrypt(
|
let res = scrypt(
|
||||||
password,
|
password,
|
||||||
|
@ -1081,5 +1067,5 @@ pub fn op_node_gen_prime(size: usize) -> ZeroCopyBuf {
|
||||||
pub async fn op_node_gen_prime_async(
|
pub async fn op_node_gen_prime_async(
|
||||||
size: usize,
|
size: usize,
|
||||||
) -> Result<ZeroCopyBuf, AnyError> {
|
) -> Result<ZeroCopyBuf, AnyError> {
|
||||||
Ok(tokio::task::spawn_blocking(move || gen_prime(size)).await?)
|
Ok(spawn_blocking(move || gen_prime(size)).await?)
|
||||||
}
|
}
|
||||||
|
|
|
@ -591,6 +591,6 @@ where
|
||||||
Fut::Output: 'static,
|
Fut::Output: 'static,
|
||||||
{
|
{
|
||||||
fn execute(&self, fut: Fut) {
|
fn execute(&self, fut: Fut) {
|
||||||
tokio::task::spawn_local(fut);
|
deno_core::task::spawn(fut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@ use deno_core::futures::task::Poll;
|
||||||
use deno_core::serde_json;
|
use deno_core::serde_json;
|
||||||
use deno_core::serde_json::json;
|
use deno_core::serde_json::json;
|
||||||
use deno_core::serde_json::Value;
|
use deno_core::serde_json::Value;
|
||||||
|
use deno_core::task::spawn;
|
||||||
use deno_core::InspectorMsg;
|
use deno_core::InspectorMsg;
|
||||||
use deno_core::InspectorSessionProxy;
|
use deno_core::InspectorSessionProxy;
|
||||||
use deno_core::JsRuntime;
|
use deno_core::JsRuntime;
|
||||||
|
@ -109,7 +110,7 @@ where
|
||||||
Fut::Output: 'static,
|
Fut::Output: 'static,
|
||||||
{
|
{
|
||||||
fn execute(&self, fut: Fut) {
|
fn execute(&self, fut: Fut) {
|
||||||
tokio::task::spawn_local(fut);
|
deno_core::task::spawn(fut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +161,7 @@ fn handle_ws_request(
|
||||||
|
|
||||||
// spawn a task that will wait for websocket connection and then pump messages between
|
// spawn a task that will wait for websocket connection and then pump messages between
|
||||||
// the socket and inspector proxy
|
// the socket and inspector proxy
|
||||||
tokio::task::spawn_local(async move {
|
spawn(async move {
|
||||||
let websocket = if let Ok(w) = fut.await {
|
let websocket = if let Ok(w) = fut.await {
|
||||||
w
|
w
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
use deno_core::task::MaskFutureAsSend;
|
||||||
|
|
||||||
pub fn create_basic_runtime() -> tokio::runtime::Runtime {
|
pub fn create_basic_runtime() -> tokio::runtime::Runtime {
|
||||||
tokio::runtime::Builder::new_current_thread()
|
tokio::runtime::Builder::new_current_thread()
|
||||||
.enable_io()
|
.enable_io()
|
||||||
|
@ -14,11 +16,14 @@ pub fn create_basic_runtime() -> tokio::runtime::Runtime {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run_local<F, R>(future: F) -> R
|
pub fn create_and_run_current_thread<F, R>(future: F) -> R
|
||||||
where
|
where
|
||||||
F: std::future::Future<Output = R>,
|
F: std::future::Future<Output = R> + 'static,
|
||||||
|
R: Send + 'static,
|
||||||
{
|
{
|
||||||
let rt = create_basic_runtime();
|
let rt = create_basic_runtime();
|
||||||
let local = tokio::task::LocalSet::new();
|
// SAFETY: this this is guaranteed to be running on a current-thread executor
|
||||||
local.block_on(&rt, future)
|
let future = unsafe { MaskFutureAsSend::new(future) };
|
||||||
|
let join_handle = rt.spawn(future);
|
||||||
|
rt.block_on(join_handle).unwrap().into_inner()
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ use crate::colors;
|
||||||
use crate::inspector_server::InspectorServer;
|
use crate::inspector_server::InspectorServer;
|
||||||
use crate::ops;
|
use crate::ops;
|
||||||
use crate::permissions::PermissionsContainer;
|
use crate::permissions::PermissionsContainer;
|
||||||
use crate::tokio_util::run_local;
|
use crate::tokio_util::create_and_run_current_thread;
|
||||||
use crate::worker::FormatJsErrorFn;
|
use crate::worker::FormatJsErrorFn;
|
||||||
use crate::BootstrapOptions;
|
use crate::BootstrapOptions;
|
||||||
use deno_broadcast_channel::InMemoryBroadcastChannel;
|
use deno_broadcast_channel::InMemoryBroadcastChannel;
|
||||||
|
@ -838,5 +838,5 @@ pub fn run_web_worker(
|
||||||
debug!("Worker thread shuts down {}", &name);
|
debug!("Worker thread shuts down {}", &name);
|
||||||
result
|
result
|
||||||
};
|
};
|
||||||
run_local(fut)
|
create_and_run_current_thread(fut)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3729,7 +3729,9 @@
|
||||||
],
|
],
|
||||||
"request-referrer.any.html": false,
|
"request-referrer.any.html": false,
|
||||||
"request-referrer.any.worker.html": false,
|
"request-referrer.any.worker.html": false,
|
||||||
"response-null-body.any.html": true
|
"response-null-body.any.html": {
|
||||||
|
"ignore": true
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"response": {
|
"response": {
|
||||||
"json.any.html": true,
|
"json.any.html": true,
|
||||||
|
|
Loading…
Reference in a new issue