1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-05 05:49:20 -05:00
denoland-deno/runtime/tokio_util.rs

138 lines
4.3 KiB
Rust
Raw Normal View History

// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use std::fmt::Debug;
use std::str::FromStr;
use deno_core::unsync::MaskFutureAsSend;
#[cfg(tokio_unstable)]
use tokio_metrics::RuntimeMonitor;
/// Default configuration for tokio. In the future, this method may have different defaults
/// depending on the platform and/or CPU layout.
const fn tokio_configuration() -> (u32, u32, usize) {
(61, 31, 1024)
}
fn tokio_env<T: FromStr>(name: &'static str, default: T) -> T
where
<T as FromStr>::Err: Debug,
{
match std::env::var(name) {
Ok(value) => value.parse().unwrap(),
Err(_) => default,
}
}
pub fn create_basic_runtime() -> tokio::runtime::Runtime {
let (event_interval, global_queue_interval, max_io_events_per_tick) =
tokio_configuration();
tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.event_interval(tokio_env("DENO_TOKIO_EVENT_INTERVAL", event_interval))
.global_queue_interval(tokio_env(
"DENO_TOKIO_GLOBAL_QUEUE_INTERVAL",
global_queue_interval,
))
.max_io_events_per_tick(tokio_env(
"DENO_TOKIO_MAX_IO_EVENTS_PER_TICK",
max_io_events_per_tick,
))
// This limits the number of threads for blocking operations (like for
// synchronous fs ops) or CPU bound tasks like when we run dprint in
// parallel for deno fmt.
// The default value is 512, which is an unhelpfully large thread pool. We
// don't ever want to have more than a couple dozen threads.
fix(cli): increase size of blocking task threadpool on windows (#26465) Fixes #26179. The original error reported in that issue is fixed on canary, but in local testing on my windows machine, `next build` would just hang forever. After some digging, what happens is that at some point in next build, readFile promises (from `fs/promises` ) just never resolve, and so next hangs. It turns out the issue is saturating tokio's blocking task thread pool. We previously limited the number of blocking threads to 32, and at some point those threads are all in use and there's no thread available for the file reads. What's taking up all of those threads? The answer turns out to be `tokio::process`. On windows, child process stdio uses the blocking threadpool: https://github.com/tokio-rs/tokio/pull/4824. When you poll the child's stdio on windows, it spawns a blocking task per poll, and calls `std::io::Read::read` in the blocking context. That call can block until data is available. Putting it all together, what happens is that Next.js spawns `2 * the number of CPU cores` deno child subprocesses to do work. We implement `child_process` with `tokio::process`. When the child processes' stdio get polled, blocking tasks get spawned, and those blocking tasks might block until data is available. So if you have 16 cores (as I do), there are going to be potentially >32 blocking task threadpool threads taken just by the child processes. That leaves no room for other tasks to make progress --- To fix this, for now, increase the size of the blocking threadpool on windows. 4 * the number of CPU cores should be enough to leave room for other tasks to make progress. Longer term, this can be fixed more properly when we handroll our own subprocess code (needed for detached processes and additional pipes on windows).
2024-10-22 15:52:18 -04:00
.max_blocking_threads(if cfg!(windows) {
// on windows, tokio uses blocking tasks for child process IO, make sure
// we have enough available threads for other tasks to run
4 * std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(8)
} else {
32
})
.build()
.unwrap()
}
#[inline(always)]
fn create_and_run_current_thread_inner<F, R>(
future: F,
metrics_enabled: bool,
) -> R
where
F: std::future::Future<Output = R> + 'static,
R: Send + 'static,
{
let rt = create_basic_runtime();
// Since this is the main future, we want to box it in debug mode because it tends to be fairly
// large and the compiler won't optimize repeated copies. We also make this runtime factory
// function #[inline(always)] to avoid holding the unboxed, unused future on the stack.
#[cfg(debug_assertions)]
// SAFETY: this this is guaranteed to be running on a current-thread executor
let future = Box::pin(unsafe { MaskFutureAsSend::new(future) });
#[cfg(not(debug_assertions))]
// SAFETY: this this is guaranteed to be running on a current-thread executor
let future = unsafe { MaskFutureAsSend::new(future) };
#[cfg(tokio_unstable)]
let join_handle = if metrics_enabled {
rt.spawn(async move {
let metrics_interval: u64 = std::env::var("DENO_TOKIO_METRICS_INTERVAL")
.ok()
.and_then(|val| val.parse().ok())
.unwrap_or(1000);
let handle = tokio::runtime::Handle::current();
let runtime_monitor = RuntimeMonitor::new(&handle);
tokio::spawn(async move {
#[allow(clippy::print_stderr)]
for interval in runtime_monitor.intervals() {
eprintln!("{:#?}", interval);
// wait 500ms
tokio::time::sleep(std::time::Duration::from_millis(
metrics_interval,
))
.await;
}
});
future.await
})
} else {
rt.spawn(future)
};
#[cfg(not(tokio_unstable))]
let join_handle = rt.spawn(future);
let r = rt.block_on(join_handle).unwrap().into_inner();
// Forcefully shutdown the runtime - we're done executing JS code at this
// point, but there might be outstanding blocking tasks that were created and
// latered "unrefed". They won't terminate on their own, so we're forcing
// termination of Tokio runtime at this point.
rt.shutdown_background();
r
}
#[inline(always)]
pub fn create_and_run_current_thread<F, R>(future: F) -> R
where
F: std::future::Future<Output = R> + 'static,
R: Send + 'static,
{
create_and_run_current_thread_inner(future, false)
}
#[inline(always)]
pub fn create_and_run_current_thread_with_maybe_metrics<F, R>(future: F) -> R
where
F: std::future::Future<Output = R> + 'static,
R: Send + 'static,
{
let metrics_enabled = std::env::var("DENO_TOKIO_METRICS").ok().is_some();
create_and_run_current_thread_inner(future, metrics_enabled)
}