mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
feat(serve): Opt-in parallelism for deno serve
(#24920)
Adds a `parallel` flag to `deno serve`. When present, we spawn multiple workers to parallelize serving requests. ```bash deno serve --parallel main.ts ``` Currently on linux we use `SO_REUSEPORT` and rely on the fact that the kernel will distribute connections in a round-robin manner. On mac and windows, we sort of emulate this by cloning the underlying file descriptor and passing a handle to each worker. The connections will not be guaranteed to be fairly distributed (and in practice almost certainly won't be), but the distribution is still spread enough to provide a significant performance increase. --- (Run on an Macbook Pro with an M3 Max, serving `deno.com` baseline:: ``` ❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000 Running 30s test @ http://127.0.0.1:8000 2 threads and 125 connections Thread Stats Avg Stdev Max +/- Stdev Latency 239.78ms 13.56ms 330.54ms 79.12% Req/Sec 258.58 35.56 360.00 70.64% Latency Distribution 50% 236.72ms 75% 248.46ms 90% 256.84ms 99% 268.23ms 15458 requests in 30.02s, 2.47GB read Requests/sec: 514.89 Transfer/sec: 84.33MB ``` this PR (`with --parallel` flag) ``` ❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000 Running 30s test @ http://127.0.0.1:8000 2 threads and 125 connections Thread Stats Avg Stdev Max +/- Stdev Latency 117.40ms 142.84ms 590.45ms 79.07% Req/Sec 1.33k 175.19 1.77k 69.00% Latency Distribution 50% 22.34ms 75% 223.67ms 90% 357.32ms 99% 460.50ms 79636 requests in 30.07s, 12.74GB read Requests/sec: 2647.96 Transfer/sec: 433.71MB ```
This commit is contained in:
parent
875ee618d3
commit
e92a05b551
15 changed files with 617 additions and 141 deletions
|
@ -229,6 +229,12 @@ opt-level = 'z' # Optimize for size
|
||||||
inherits = "release"
|
inherits = "release"
|
||||||
debug = true
|
debug = true
|
||||||
|
|
||||||
|
# Faster to compile than `release` but with similar performance.
|
||||||
|
[profile.release-lite]
|
||||||
|
inherits = "release"
|
||||||
|
codegen-units = 128
|
||||||
|
lto = "thin"
|
||||||
|
|
||||||
# NB: the `bench` and `release` profiles must remain EXACTLY the same.
|
# NB: the `bench` and `release` profiles must remain EXACTLY the same.
|
||||||
[profile.bench]
|
[profile.bench]
|
||||||
codegen-units = 1
|
codegen-units = 1
|
||||||
|
|
|
@ -339,6 +339,7 @@ pub struct ServeFlags {
|
||||||
pub watch: Option<WatchFlagsWithPaths>,
|
pub watch: Option<WatchFlagsWithPaths>,
|
||||||
pub port: u16,
|
pub port: u16,
|
||||||
pub host: String,
|
pub host: String,
|
||||||
|
pub worker_count: Option<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServeFlags {
|
impl ServeFlags {
|
||||||
|
@ -349,6 +350,7 @@ impl ServeFlags {
|
||||||
watch: None,
|
watch: None,
|
||||||
port,
|
port,
|
||||||
host: host.to_owned(),
|
host: host.to_owned(),
|
||||||
|
worker_count: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2693,6 +2695,9 @@ fn serve_subcommand() -> Command {
|
||||||
.help("The TCP address to serve on, defaulting to 0.0.0.0 (all interfaces).")
|
.help("The TCP address to serve on, defaulting to 0.0.0.0 (all interfaces).")
|
||||||
.value_parser(serve_host_validator),
|
.value_parser(serve_host_validator),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
parallel_arg("multiple server workers", false)
|
||||||
|
)
|
||||||
.arg(check_arg(false))
|
.arg(check_arg(false))
|
||||||
.arg(watch_arg(true))
|
.arg(watch_arg(true))
|
||||||
.arg(watch_exclude_arg())
|
.arg(watch_exclude_arg())
|
||||||
|
@ -2854,11 +2859,7 @@ Directory arguments are expanded to all contained files matching the glob
|
||||||
.action(ArgAction::SetTrue),
|
.action(ArgAction::SetTrue),
|
||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::new("parallel")
|
parallel_arg("test modules", true)
|
||||||
.long("parallel")
|
|
||||||
.help("Run test modules in parallel. Parallelism defaults to the number of available CPUs or the value in the DENO_JOBS environment variable.")
|
|
||||||
.conflicts_with("jobs")
|
|
||||||
.action(ArgAction::SetTrue)
|
|
||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::new("jobs")
|
Arg::new("jobs")
|
||||||
|
@ -2901,6 +2902,18 @@ Directory arguments are expanded to all contained files matching the glob
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn parallel_arg(descr: &str, jobs_fallback: bool) -> Arg {
|
||||||
|
let arg = Arg::new("parallel")
|
||||||
|
.long("parallel")
|
||||||
|
.help(format!("Run {descr} in parallel. Parallelism defaults to the number of available CPUs or the value in the DENO_JOBS environment variable."))
|
||||||
|
.action(ArgAction::SetTrue);
|
||||||
|
if jobs_fallback {
|
||||||
|
arg.conflicts_with("jobs")
|
||||||
|
} else {
|
||||||
|
arg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn types_subcommand() -> Command {
|
fn types_subcommand() -> Command {
|
||||||
Command::new("types").about(
|
Command::new("types").about(
|
||||||
"Print runtime TypeScript declarations.
|
"Print runtime TypeScript declarations.
|
||||||
|
@ -4416,6 +4429,8 @@ fn serve_parse(
|
||||||
.remove_one::<String>("host")
|
.remove_one::<String>("host")
|
||||||
.unwrap_or_else(|| "0.0.0.0".to_owned());
|
.unwrap_or_else(|| "0.0.0.0".to_owned());
|
||||||
|
|
||||||
|
let worker_count = parallel_arg_parse(matches, false).map(|v| v.get());
|
||||||
|
|
||||||
runtime_args_parse(flags, matches, true, true);
|
runtime_args_parse(flags, matches, true, true);
|
||||||
// If the user didn't pass --allow-net, add this port to the network
|
// If the user didn't pass --allow-net, add this port to the network
|
||||||
// allowlist. If the host is 0.0.0.0, we add :{port} and allow the same network perms
|
// allowlist. If the host is 0.0.0.0, we add :{port} and allow the same network perms
|
||||||
|
@ -4455,6 +4470,7 @@ fn serve_parse(
|
||||||
watch: watch_arg_parse_with_paths(matches),
|
watch: watch_arg_parse_with_paths(matches),
|
||||||
port,
|
port,
|
||||||
host,
|
host,
|
||||||
|
worker_count,
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -4486,6 +4502,42 @@ fn task_parse(flags: &mut Flags, matches: &mut ArgMatches) {
|
||||||
flags.subcommand = DenoSubcommand::Task(task_flags);
|
flags.subcommand = DenoSubcommand::Task(task_flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn parallel_arg_parse(
|
||||||
|
matches: &mut ArgMatches,
|
||||||
|
fallback_to_jobs: bool,
|
||||||
|
) -> Option<NonZeroUsize> {
|
||||||
|
if matches.get_flag("parallel") {
|
||||||
|
if let Ok(value) = env::var("DENO_JOBS") {
|
||||||
|
value.parse::<NonZeroUsize>().ok()
|
||||||
|
} else {
|
||||||
|
std::thread::available_parallelism().ok()
|
||||||
|
}
|
||||||
|
} else if fallback_to_jobs && matches.contains_id("jobs") {
|
||||||
|
// We can't change this to use the log crate because its not configured
|
||||||
|
// yet at this point since the flags haven't been parsed. This flag is
|
||||||
|
// deprecated though so it's not worth changing the code to use the log
|
||||||
|
// crate here and this is only done for testing anyway.
|
||||||
|
#[allow(clippy::print_stderr)]
|
||||||
|
{
|
||||||
|
eprintln!(
|
||||||
|
"⚠️ {}",
|
||||||
|
crate::colors::yellow(concat!(
|
||||||
|
"The `--jobs` flag is deprecated and will be removed in Deno 2.0.\n",
|
||||||
|
"Use the `--parallel` flag with possibly the `DENO_JOBS` environment variable instead.\n",
|
||||||
|
"Learn more at: https://docs.deno.com/runtime/manual/basics/env_variables"
|
||||||
|
)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Some(value) = matches.remove_one::<NonZeroUsize>("jobs") {
|
||||||
|
Some(value)
|
||||||
|
} else {
|
||||||
|
std::thread::available_parallelism().ok()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn test_parse(flags: &mut Flags, matches: &mut ArgMatches) {
|
fn test_parse(flags: &mut Flags, matches: &mut ArgMatches) {
|
||||||
flags.type_check_mode = TypeCheckMode::Local;
|
flags.type_check_mode = TypeCheckMode::Local;
|
||||||
runtime_args_parse(flags, matches, true, true);
|
runtime_args_parse(flags, matches, true, true);
|
||||||
|
@ -4552,36 +4604,7 @@ fn test_parse(flags: &mut Flags, matches: &mut ArgMatches) {
|
||||||
flags.argv.extend(script_arg);
|
flags.argv.extend(script_arg);
|
||||||
}
|
}
|
||||||
|
|
||||||
let concurrent_jobs = if matches.get_flag("parallel") {
|
let concurrent_jobs = parallel_arg_parse(matches, true);
|
||||||
if let Ok(value) = env::var("DENO_JOBS") {
|
|
||||||
value.parse::<NonZeroUsize>().ok()
|
|
||||||
} else {
|
|
||||||
std::thread::available_parallelism().ok()
|
|
||||||
}
|
|
||||||
} else if matches.contains_id("jobs") {
|
|
||||||
// We can't change this to use the log crate because its not configured
|
|
||||||
// yet at this point since the flags haven't been parsed. This flag is
|
|
||||||
// deprecated though so it's not worth changing the code to use the log
|
|
||||||
// crate here and this is only done for testing anyway.
|
|
||||||
#[allow(clippy::print_stderr)]
|
|
||||||
{
|
|
||||||
eprintln!(
|
|
||||||
"⚠️ {}",
|
|
||||||
crate::colors::yellow(concat!(
|
|
||||||
"The `--jobs` flag is deprecated and will be removed in Deno 2.0.\n",
|
|
||||||
"Use the `--parallel` flag with possibly the `DENO_JOBS` environment variable instead.\n",
|
|
||||||
"Learn more at: https://docs.deno.com/runtime/manual/basics/env_variables"
|
|
||||||
)),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if let Some(value) = matches.remove_one::<NonZeroUsize>("jobs") {
|
|
||||||
Some(value)
|
|
||||||
} else {
|
|
||||||
std::thread::available_parallelism().ok()
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let include = if let Some(files) = matches.remove_many::<String>("files") {
|
let include = if let Some(files) = matches.remove_many::<String>("files") {
|
||||||
files.collect()
|
files.collect()
|
||||||
|
|
|
@ -236,7 +236,7 @@ async fn run_subcommand(flags: Arc<Flags>) -> Result<i32, AnyError> {
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
DenoSubcommand::Serve(serve_flags) => spawn_subcommand(async move {
|
DenoSubcommand::Serve(serve_flags) => spawn_subcommand(async move {
|
||||||
tools::run::run_script(WorkerExecutionMode::Serve, flags, serve_flags.watch).await
|
tools::serve::serve(flags, serve_flags).await
|
||||||
}),
|
}),
|
||||||
DenoSubcommand::Task(task_flags) => spawn_subcommand(async {
|
DenoSubcommand::Task(task_flags) => spawn_subcommand(async {
|
||||||
tools::task::execute_script(flags, task_flags, false).await
|
tools::task::execute_script(flags, task_flags, false).await
|
||||||
|
|
|
@ -16,6 +16,7 @@ pub mod lint;
|
||||||
pub mod registry;
|
pub mod registry;
|
||||||
pub mod repl;
|
pub mod repl;
|
||||||
pub mod run;
|
pub mod run;
|
||||||
|
pub mod serve;
|
||||||
pub mod task;
|
pub mod task;
|
||||||
pub mod test;
|
pub mod test;
|
||||||
pub mod upgrade;
|
pub mod upgrade;
|
||||||
|
|
|
@ -18,11 +18,7 @@ use crate::util::file_watcher::WatcherRestartMode;
|
||||||
|
|
||||||
pub mod hmr;
|
pub mod hmr;
|
||||||
|
|
||||||
pub async fn run_script(
|
pub fn check_permission_before_script(flags: &Flags) {
|
||||||
mode: WorkerExecutionMode,
|
|
||||||
flags: Arc<Flags>,
|
|
||||||
watch: Option<WatchFlagsWithPaths>,
|
|
||||||
) -> Result<i32, AnyError> {
|
|
||||||
if !flags.has_permission() && flags.has_permission_in_argv() {
|
if !flags.has_permission() && flags.has_permission_in_argv() {
|
||||||
log::warn!(
|
log::warn!(
|
||||||
"{}",
|
"{}",
|
||||||
|
@ -33,6 +29,14 @@ To grant permissions, set them before the script argument. For example:
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run_script(
|
||||||
|
mode: WorkerExecutionMode,
|
||||||
|
flags: Arc<Flags>,
|
||||||
|
watch: Option<WatchFlagsWithPaths>,
|
||||||
|
) -> Result<i32, AnyError> {
|
||||||
|
check_permission_before_script(&flags);
|
||||||
|
|
||||||
if let Some(watch_flags) = watch {
|
if let Some(watch_flags) = watch {
|
||||||
return run_with_watch(mode, flags, watch_flags).await;
|
return run_with_watch(mode, flags, watch_flags).await;
|
||||||
|
@ -187,7 +191,7 @@ pub async fn eval_command(
|
||||||
Ok(exit_code)
|
Ok(exit_code)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn maybe_npm_install(factory: &CliFactory) -> Result<(), AnyError> {
|
pub async fn maybe_npm_install(factory: &CliFactory) -> Result<(), AnyError> {
|
||||||
// ensure an "npm install" is done if the user has explicitly
|
// ensure an "npm install" is done if the user has explicitly
|
||||||
// opted into using a managed node_modules directory
|
// opted into using a managed node_modules directory
|
||||||
if factory.cli_options()?.node_modules_dir_enablement() == Some(true) {
|
if factory.cli_options()?.node_modules_dir_enablement() == Some(true) {
|
||||||
|
|
192
cli/tools/serve.rs
Normal file
192
cli/tools/serve.rs
Normal file
|
@ -0,0 +1,192 @@
|
||||||
|
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use deno_core::error::AnyError;
|
||||||
|
use deno_core::futures::TryFutureExt;
|
||||||
|
use deno_core::ModuleSpecifier;
|
||||||
|
use deno_runtime::deno_permissions::Permissions;
|
||||||
|
use deno_runtime::deno_permissions::PermissionsContainer;
|
||||||
|
|
||||||
|
use super::run::check_permission_before_script;
|
||||||
|
use super::run::maybe_npm_install;
|
||||||
|
use crate::args::Flags;
|
||||||
|
use crate::args::ServeFlags;
|
||||||
|
use crate::args::WatchFlagsWithPaths;
|
||||||
|
use crate::factory::CliFactory;
|
||||||
|
use crate::util::file_watcher::WatcherRestartMode;
|
||||||
|
use crate::worker::CliMainWorkerFactory;
|
||||||
|
|
||||||
|
pub async fn serve(
|
||||||
|
flags: Arc<Flags>,
|
||||||
|
serve_flags: ServeFlags,
|
||||||
|
) -> Result<i32, AnyError> {
|
||||||
|
check_permission_before_script(&flags);
|
||||||
|
|
||||||
|
if let Some(watch_flags) = serve_flags.watch {
|
||||||
|
return serve_with_watch(flags, watch_flags, serve_flags.worker_count)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let factory = CliFactory::from_flags(flags);
|
||||||
|
let cli_options = factory.cli_options()?;
|
||||||
|
let deno_dir = factory.deno_dir()?;
|
||||||
|
let http_client = factory.http_client_provider();
|
||||||
|
|
||||||
|
// Run a background task that checks for available upgrades or output
|
||||||
|
// if an earlier run of this background task found a new version of Deno.
|
||||||
|
#[cfg(feature = "upgrade")]
|
||||||
|
super::upgrade::check_for_upgrades(
|
||||||
|
http_client.clone(),
|
||||||
|
deno_dir.upgrade_check_file_path(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let main_module = cli_options.resolve_main_module()?;
|
||||||
|
|
||||||
|
maybe_npm_install(&factory).await?;
|
||||||
|
|
||||||
|
let permissions = PermissionsContainer::new(Permissions::from_options(
|
||||||
|
&cli_options.permissions_options()?,
|
||||||
|
)?);
|
||||||
|
let worker_factory = factory.create_cli_main_worker_factory().await?;
|
||||||
|
|
||||||
|
do_serve(
|
||||||
|
worker_factory,
|
||||||
|
main_module,
|
||||||
|
permissions,
|
||||||
|
serve_flags.worker_count,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn do_serve(
|
||||||
|
worker_factory: CliMainWorkerFactory,
|
||||||
|
main_module: ModuleSpecifier,
|
||||||
|
permissions: PermissionsContainer,
|
||||||
|
worker_count: Option<usize>,
|
||||||
|
hmr: bool,
|
||||||
|
) -> Result<i32, AnyError> {
|
||||||
|
let mut worker = worker_factory
|
||||||
|
.create_main_worker(
|
||||||
|
deno_runtime::WorkerExecutionMode::Serve {
|
||||||
|
is_main: true,
|
||||||
|
worker_count,
|
||||||
|
},
|
||||||
|
main_module.clone(),
|
||||||
|
permissions.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let worker_count = match worker_count {
|
||||||
|
None | Some(1) => return worker.run().await,
|
||||||
|
Some(c) => c,
|
||||||
|
};
|
||||||
|
|
||||||
|
let main = deno_core::unsync::spawn(async move { worker.run().await });
|
||||||
|
|
||||||
|
let extra_workers = worker_count.saturating_sub(1);
|
||||||
|
|
||||||
|
let mut channels = Vec::with_capacity(extra_workers);
|
||||||
|
for i in 0..extra_workers {
|
||||||
|
let worker_factory = worker_factory.clone();
|
||||||
|
let main_module = main_module.clone();
|
||||||
|
let permissions = permissions.clone();
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
channels.push(rx);
|
||||||
|
std::thread::Builder::new()
|
||||||
|
.name(format!("serve-worker-{i}"))
|
||||||
|
.spawn(move || {
|
||||||
|
deno_runtime::tokio_util::create_and_run_current_thread(async move {
|
||||||
|
let result =
|
||||||
|
run_worker(i, worker_factory, main_module, permissions, hmr).await;
|
||||||
|
let _ = tx.send(result);
|
||||||
|
});
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (main_result, worker_results) = tokio::try_join!(
|
||||||
|
main.map_err(AnyError::from),
|
||||||
|
deno_core::futures::future::try_join_all(
|
||||||
|
channels.into_iter().map(|r| r.map_err(AnyError::from))
|
||||||
|
)
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let mut exit_code = main_result?;
|
||||||
|
for res in worker_results {
|
||||||
|
let ret = res?;
|
||||||
|
if ret != 0 && exit_code == 0 {
|
||||||
|
exit_code = ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(exit_code)
|
||||||
|
|
||||||
|
// main.await?
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_worker(
|
||||||
|
worker_count: usize,
|
||||||
|
worker_factory: CliMainWorkerFactory,
|
||||||
|
main_module: ModuleSpecifier,
|
||||||
|
permissions: PermissionsContainer,
|
||||||
|
hmr: bool,
|
||||||
|
) -> Result<i32, AnyError> {
|
||||||
|
let mut worker = worker_factory
|
||||||
|
.create_main_worker(
|
||||||
|
deno_runtime::WorkerExecutionMode::Serve {
|
||||||
|
is_main: false,
|
||||||
|
worker_count: Some(worker_count),
|
||||||
|
},
|
||||||
|
main_module,
|
||||||
|
permissions,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
if hmr {
|
||||||
|
worker.run_for_watcher().await?;
|
||||||
|
Ok(0)
|
||||||
|
} else {
|
||||||
|
worker.run().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn serve_with_watch(
|
||||||
|
flags: Arc<Flags>,
|
||||||
|
watch_flags: WatchFlagsWithPaths,
|
||||||
|
worker_count: Option<usize>,
|
||||||
|
) -> Result<i32, AnyError> {
|
||||||
|
let hmr = watch_flags.hmr;
|
||||||
|
crate::util::file_watcher::watch_recv(
|
||||||
|
flags,
|
||||||
|
crate::util::file_watcher::PrintConfig::new_with_banner(
|
||||||
|
if watch_flags.hmr { "HMR" } else { "Watcher" },
|
||||||
|
"Process",
|
||||||
|
!watch_flags.no_clear_screen,
|
||||||
|
),
|
||||||
|
WatcherRestartMode::Automatic,
|
||||||
|
move |flags, watcher_communicator, _changed_paths| {
|
||||||
|
Ok(async move {
|
||||||
|
let factory = CliFactory::from_flags_for_watcher(
|
||||||
|
flags,
|
||||||
|
watcher_communicator.clone(),
|
||||||
|
);
|
||||||
|
let cli_options = factory.cli_options()?;
|
||||||
|
let main_module = cli_options.resolve_main_module()?;
|
||||||
|
|
||||||
|
maybe_npm_install(&factory).await?;
|
||||||
|
|
||||||
|
let _ = watcher_communicator.watch_paths(cli_options.watch_paths());
|
||||||
|
|
||||||
|
let permissions = PermissionsContainer::new(Permissions::from_options(
|
||||||
|
&cli_options.permissions_options()?,
|
||||||
|
)?);
|
||||||
|
let worker_factory = factory.create_cli_main_worker_factory().await?;
|
||||||
|
|
||||||
|
do_serve(worker_factory, main_module, permissions, worker_count, hmr)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Ok(0)
|
||||||
|
}
|
|
@ -414,6 +414,7 @@ impl CliMainWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct CliMainWorkerFactory {
|
pub struct CliMainWorkerFactory {
|
||||||
shared: Arc<SharedWorkerState>,
|
shared: Arc<SharedWorkerState>,
|
||||||
}
|
}
|
||||||
|
@ -546,7 +547,7 @@ impl CliMainWorkerFactory {
|
||||||
let maybe_inspector_server = shared.maybe_inspector_server.clone();
|
let maybe_inspector_server = shared.maybe_inspector_server.clone();
|
||||||
|
|
||||||
let create_web_worker_cb =
|
let create_web_worker_cb =
|
||||||
create_web_worker_callback(mode, shared.clone(), stdio.clone());
|
create_web_worker_callback(shared.clone(), stdio.clone());
|
||||||
|
|
||||||
let maybe_storage_key = shared
|
let maybe_storage_key = shared
|
||||||
.storage_key_resolver
|
.storage_key_resolver
|
||||||
|
@ -739,7 +740,6 @@ impl CliMainWorkerFactory {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_web_worker_callback(
|
fn create_web_worker_callback(
|
||||||
mode: WorkerExecutionMode,
|
|
||||||
shared: Arc<SharedWorkerState>,
|
shared: Arc<SharedWorkerState>,
|
||||||
stdio: deno_runtime::deno_io::Stdio,
|
stdio: deno_runtime::deno_io::Stdio,
|
||||||
) -> Arc<CreateWebWorkerCb> {
|
) -> Arc<CreateWebWorkerCb> {
|
||||||
|
@ -752,7 +752,7 @@ fn create_web_worker_callback(
|
||||||
args.permissions.clone(),
|
args.permissions.clone(),
|
||||||
);
|
);
|
||||||
let create_web_worker_cb =
|
let create_web_worker_cb =
|
||||||
create_web_worker_callback(mode, shared.clone(), stdio.clone());
|
create_web_worker_callback(shared.clone(), stdio.clone());
|
||||||
|
|
||||||
let maybe_storage_key = shared
|
let maybe_storage_key = shared
|
||||||
.storage_key_resolver
|
.storage_key_resolver
|
||||||
|
@ -802,7 +802,7 @@ fn create_web_worker_callback(
|
||||||
disable_deprecated_api_warning: shared.disable_deprecated_api_warning,
|
disable_deprecated_api_warning: shared.disable_deprecated_api_warning,
|
||||||
verbose_deprecated_api_warning: shared.verbose_deprecated_api_warning,
|
verbose_deprecated_api_warning: shared.verbose_deprecated_api_warning,
|
||||||
future: shared.enable_future_features,
|
future: shared.enable_future_features,
|
||||||
mode,
|
mode: WorkerExecutionMode::Worker,
|
||||||
serve_port: shared.serve_port,
|
serve_port: shared.serve_port,
|
||||||
serve_host: shared.serve_host.clone(),
|
serve_host: shared.serve_host.clone(),
|
||||||
},
|
},
|
||||||
|
|
|
@ -579,6 +579,8 @@ type RawServeOptions = {
|
||||||
handler?: RawHandler;
|
handler?: RawHandler;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const kLoadBalanced = Symbol("kLoadBalanced");
|
||||||
|
|
||||||
function serve(arg1, arg2) {
|
function serve(arg1, arg2) {
|
||||||
let options: RawServeOptions | undefined;
|
let options: RawServeOptions | undefined;
|
||||||
let handler: RawHandler | undefined;
|
let handler: RawHandler | undefined;
|
||||||
|
@ -634,6 +636,7 @@ function serve(arg1, arg2) {
|
||||||
hostname: options.hostname ?? "0.0.0.0",
|
hostname: options.hostname ?? "0.0.0.0",
|
||||||
port: options.port ?? 8000,
|
port: options.port ?? 8000,
|
||||||
reusePort: options.reusePort ?? false,
|
reusePort: options.reusePort ?? false,
|
||||||
|
loadBalanced: options[kLoadBalanced] ?? false,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (options.certFile || options.keyFile) {
|
if (options.certFile || options.keyFile) {
|
||||||
|
@ -842,18 +845,25 @@ function registerDeclarativeServer(exports) {
|
||||||
"Invalid type for fetch: must be a function with a single or no parameter",
|
"Invalid type for fetch: must be a function with a single or no parameter",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
return ({ servePort, serveHost }) => {
|
return ({ servePort, serveHost, serveIsMain, serveWorkerCount }) => {
|
||||||
Deno.serve({
|
Deno.serve({
|
||||||
port: servePort,
|
port: servePort,
|
||||||
hostname: serveHost,
|
hostname: serveHost,
|
||||||
|
[kLoadBalanced]: (serveIsMain && serveWorkerCount > 1) ||
|
||||||
|
(serveWorkerCount !== null),
|
||||||
onListen: ({ port, hostname }) => {
|
onListen: ({ port, hostname }) => {
|
||||||
|
if (serveIsMain) {
|
||||||
|
const nThreads = serveWorkerCount > 1
|
||||||
|
? ` with ${serveWorkerCount} threads`
|
||||||
|
: "";
|
||||||
console.debug(
|
console.debug(
|
||||||
`%cdeno serve%c: Listening on %chttp://${hostname}:${port}/%c`,
|
`%cdeno serve%c: Listening on %chttp://${hostname}:${port}/%c${nThreads}`,
|
||||||
"color: green",
|
"color: green",
|
||||||
"color: inherit",
|
"color: inherit",
|
||||||
"color: yellow",
|
"color: yellow",
|
||||||
"color: inherit",
|
"color: inherit",
|
||||||
);
|
);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
handler: (req) => {
|
handler: (req) => {
|
||||||
return exports.fetch(req);
|
return exports.fetch(req);
|
||||||
|
|
|
@ -531,10 +531,14 @@ const listenOptionApiName = Symbol("listenOptionApiName");
|
||||||
function listen(args) {
|
function listen(args) {
|
||||||
switch (args.transport ?? "tcp") {
|
switch (args.transport ?? "tcp") {
|
||||||
case "tcp": {
|
case "tcp": {
|
||||||
const { 0: rid, 1: addr } = op_net_listen_tcp({
|
const { 0: rid, 1: addr } = op_net_listen_tcp(
|
||||||
|
{
|
||||||
hostname: args.hostname ?? "0.0.0.0",
|
hostname: args.hostname ?? "0.0.0.0",
|
||||||
port: Number(args.port),
|
port: Number(args.port),
|
||||||
}, args.reusePort);
|
},
|
||||||
|
args.reusePort,
|
||||||
|
args.loadBalanced ?? false,
|
||||||
|
);
|
||||||
addr.transport = "tcp";
|
addr.transport = "tcp";
|
||||||
return new Listener(rid, addr);
|
return new Listener(rid, addr);
|
||||||
}
|
}
|
||||||
|
|
|
@ -353,6 +353,7 @@ pub fn op_net_listen_tcp<NP>(
|
||||||
state: &mut OpState,
|
state: &mut OpState,
|
||||||
#[serde] addr: IpAddr,
|
#[serde] addr: IpAddr,
|
||||||
reuse_port: bool,
|
reuse_port: bool,
|
||||||
|
load_balanced: bool,
|
||||||
) -> Result<(ResourceId, IpAddr), AnyError>
|
) -> Result<(ResourceId, IpAddr), AnyError>
|
||||||
where
|
where
|
||||||
NP: NetPermissions + 'static,
|
NP: NetPermissions + 'static,
|
||||||
|
@ -367,7 +368,11 @@ where
|
||||||
.next()
|
.next()
|
||||||
.ok_or_else(|| generic_error("No resolved address found"))?;
|
.ok_or_else(|| generic_error("No resolved address found"))?;
|
||||||
|
|
||||||
let listener = TcpListener::bind_direct(addr, reuse_port)?;
|
let listener = if load_balanced {
|
||||||
|
TcpListener::bind_load_balanced(addr)
|
||||||
|
} else {
|
||||||
|
TcpListener::bind_direct(addr, reuse_port)
|
||||||
|
}?;
|
||||||
let local_addr = listener.local_addr()?;
|
let local_addr = listener.local_addr()?;
|
||||||
let listener_resource = NetworkListenerResource::new(listener);
|
let listener_resource = NetworkListenerResource::new(listener);
|
||||||
let rid = state.resource_table.add(listener_resource);
|
let rid = state.resource_table.add(listener_resource);
|
||||||
|
|
|
@ -475,6 +475,8 @@ fn load_private_keys_from_file(
|
||||||
pub struct ListenTlsArgs {
|
pub struct ListenTlsArgs {
|
||||||
alpn_protocols: Option<Vec<String>>,
|
alpn_protocols: Option<Vec<String>>,
|
||||||
reuse_port: bool,
|
reuse_port: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
load_balanced: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op2]
|
#[op2]
|
||||||
|
@ -502,7 +504,11 @@ where
|
||||||
.next()
|
.next()
|
||||||
.ok_or_else(|| generic_error("No resolved address found"))?;
|
.ok_or_else(|| generic_error("No resolved address found"))?;
|
||||||
|
|
||||||
let tcp_listener = TcpListener::bind_direct(bind_addr, args.reuse_port)?;
|
let tcp_listener = if args.load_balanced {
|
||||||
|
TcpListener::bind_load_balanced(bind_addr)
|
||||||
|
} else {
|
||||||
|
TcpListener::bind_direct(bind_addr, args.reuse_port)
|
||||||
|
}?;
|
||||||
let local_addr = tcp_listener.local_addr()?;
|
let local_addr = tcp_listener.local_addr()?;
|
||||||
let alpn = args
|
let alpn = args
|
||||||
.alpn_protocols
|
.alpn_protocols
|
||||||
|
|
|
@ -45,6 +45,7 @@ const {
|
||||||
PromiseResolve,
|
PromiseResolve,
|
||||||
SafeSet,
|
SafeSet,
|
||||||
StringPrototypeIncludes,
|
StringPrototypeIncludes,
|
||||||
|
StringPrototypePadEnd,
|
||||||
StringPrototypeSplit,
|
StringPrototypeSplit,
|
||||||
StringPrototypeTrim,
|
StringPrototypeTrim,
|
||||||
Symbol,
|
Symbol,
|
||||||
|
@ -709,8 +710,37 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) {
|
||||||
11: mode,
|
11: mode,
|
||||||
12: servePort,
|
12: servePort,
|
||||||
13: serveHost,
|
13: serveHost,
|
||||||
|
14: serveIsMain,
|
||||||
|
15: serveWorkerCount,
|
||||||
} = runtimeOptions;
|
} = runtimeOptions;
|
||||||
|
|
||||||
|
if (mode === executionModes.serve) {
|
||||||
|
if (serveIsMain && serveWorkerCount) {
|
||||||
|
const origLog = console.log;
|
||||||
|
const origError = console.error;
|
||||||
|
const prefix = `[serve-worker-0 ]`;
|
||||||
|
console.log = (...args) => {
|
||||||
|
return origLog(prefix, ...new primordials.SafeArrayIterator(args));
|
||||||
|
};
|
||||||
|
console.error = (...args) => {
|
||||||
|
return origError(prefix, ...new primordials.SafeArrayIterator(args));
|
||||||
|
};
|
||||||
|
} else if (serveWorkerCount !== null) {
|
||||||
|
const origLog = console.log;
|
||||||
|
const origError = console.error;
|
||||||
|
const base = `serve-worker-${serveWorkerCount + 1}`;
|
||||||
|
// 15 = "serve-worker-nn".length, assuming
|
||||||
|
// serveWorkerCount < 100
|
||||||
|
const prefix = `[${StringPrototypePadEnd(base, 15, " ")}]`;
|
||||||
|
console.log = (...args) => {
|
||||||
|
return origLog(prefix, ...new primordials.SafeArrayIterator(args));
|
||||||
|
};
|
||||||
|
console.error = (...args) => {
|
||||||
|
return origError(prefix, ...new primordials.SafeArrayIterator(args));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (mode === executionModes.run || mode === executionModes.serve) {
|
if (mode === executionModes.run || mode === executionModes.serve) {
|
||||||
let serve = undefined;
|
let serve = undefined;
|
||||||
core.addMainModuleHandler((main) => {
|
core.addMainModuleHandler((main) => {
|
||||||
|
@ -725,6 +755,8 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mode === executionModes.serve && !serve) {
|
if (mode === executionModes.serve && !serve) {
|
||||||
|
if (serveIsMain) {
|
||||||
|
// Only error if main worker
|
||||||
console.error(
|
console.error(
|
||||||
`%cerror: %cdeno serve requires %cexport default { fetch }%c in the main module, did you mean to run \"deno run\"?`,
|
`%cerror: %cdeno serve requires %cexport default { fetch }%c in the main module, did you mean to run \"deno run\"?`,
|
||||||
"color: yellow;",
|
"color: yellow;",
|
||||||
|
@ -732,6 +764,7 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) {
|
||||||
"font-weight: bold;",
|
"font-weight: bold;",
|
||||||
"font-weight: normal;",
|
"font-weight: normal;",
|
||||||
);
|
);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -746,7 +779,7 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if (mode === executionModes.serve) {
|
if (mode === executionModes.serve) {
|
||||||
serve({ servePort, serveHost });
|
serve({ servePort, serveHost, serveIsMain, serveWorkerCount });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -10,7 +10,6 @@ use deno_terminal::colors;
|
||||||
|
|
||||||
/// The execution mode for this worker. Some modes may have implicit behaviour.
|
/// The execution mode for this worker. Some modes may have implicit behaviour.
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
#[repr(u8)]
|
|
||||||
pub enum WorkerExecutionMode {
|
pub enum WorkerExecutionMode {
|
||||||
/// No special behaviour.
|
/// No special behaviour.
|
||||||
None,
|
None,
|
||||||
|
@ -28,11 +27,39 @@ pub enum WorkerExecutionMode {
|
||||||
/// `deno bench`
|
/// `deno bench`
|
||||||
Bench,
|
Bench,
|
||||||
/// `deno serve`
|
/// `deno serve`
|
||||||
Serve,
|
Serve {
|
||||||
|
is_main: bool,
|
||||||
|
worker_count: Option<usize>,
|
||||||
|
},
|
||||||
/// `deno jupyter`
|
/// `deno jupyter`
|
||||||
Jupyter,
|
Jupyter,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl WorkerExecutionMode {
|
||||||
|
pub fn discriminant(&self) -> u8 {
|
||||||
|
match self {
|
||||||
|
WorkerExecutionMode::None => 0,
|
||||||
|
WorkerExecutionMode::Worker => 1,
|
||||||
|
WorkerExecutionMode::Run => 2,
|
||||||
|
WorkerExecutionMode::Repl => 3,
|
||||||
|
WorkerExecutionMode::Eval => 4,
|
||||||
|
WorkerExecutionMode::Test => 5,
|
||||||
|
WorkerExecutionMode::Bench => 6,
|
||||||
|
WorkerExecutionMode::Serve { .. } => 7,
|
||||||
|
WorkerExecutionMode::Jupyter => 8,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn serve_info(&self) -> (Option<bool>, Option<usize>) {
|
||||||
|
match *self {
|
||||||
|
WorkerExecutionMode::Serve {
|
||||||
|
is_main,
|
||||||
|
worker_count,
|
||||||
|
} => (Some(is_main), worker_count),
|
||||||
|
_ => (None, None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// The log level to use when printing diagnostic log messages, warnings,
|
/// The log level to use when printing diagnostic log messages, warnings,
|
||||||
/// or errors in the worker.
|
/// or errors in the worker.
|
||||||
///
|
///
|
||||||
|
@ -175,6 +202,10 @@ struct BootstrapV8<'a>(
|
||||||
u16,
|
u16,
|
||||||
// serve host
|
// serve host
|
||||||
Option<&'a str>,
|
Option<&'a str>,
|
||||||
|
// serve is main
|
||||||
|
Option<bool>,
|
||||||
|
// serve worker count
|
||||||
|
Option<usize>,
|
||||||
);
|
);
|
||||||
|
|
||||||
impl BootstrapOptions {
|
impl BootstrapOptions {
|
||||||
|
@ -186,6 +217,7 @@ impl BootstrapOptions {
|
||||||
let scope = RefCell::new(scope);
|
let scope = RefCell::new(scope);
|
||||||
let ser = deno_core::serde_v8::Serializer::new(&scope);
|
let ser = deno_core::serde_v8::Serializer::new(&scope);
|
||||||
|
|
||||||
|
let (serve_is_main, serve_worker_count) = self.mode.serve_info();
|
||||||
let bootstrap = BootstrapV8(
|
let bootstrap = BootstrapV8(
|
||||||
self.location.as_ref().map(|l| l.as_str()),
|
self.location.as_ref().map(|l| l.as_str()),
|
||||||
self.unstable,
|
self.unstable,
|
||||||
|
@ -198,9 +230,11 @@ impl BootstrapOptions {
|
||||||
self.disable_deprecated_api_warning,
|
self.disable_deprecated_api_warning,
|
||||||
self.verbose_deprecated_api_warning,
|
self.verbose_deprecated_api_warning,
|
||||||
self.future,
|
self.future,
|
||||||
self.mode as u8 as _,
|
self.mode.discriminant() as _,
|
||||||
self.serve_port.unwrap_or_default(),
|
self.serve_port.unwrap_or_default(),
|
||||||
self.serve_host.as_deref(),
|
self.serve_host.as_deref(),
|
||||||
|
serve_is_main,
|
||||||
|
serve_worker_count,
|
||||||
);
|
);
|
||||||
|
|
||||||
bootstrap.serialize(ser).unwrap()
|
bootstrap.serialize(ser).unwrap()
|
||||||
|
|
|
@ -1,93 +1,244 @@
|
||||||
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use pretty_assertions::assert_eq;
|
use pretty_assertions::assert_eq;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
|
use reqwest::RequestBuilder;
|
||||||
use test_util as util;
|
use test_util as util;
|
||||||
|
use test_util::DenoChild;
|
||||||
|
use tokio::time::timeout;
|
||||||
|
|
||||||
#[tokio::test]
|
struct ServeClient {
|
||||||
async fn deno_serve_port_0() {
|
child: RefCell<DenoChild>,
|
||||||
let mut child = util::deno_cmd()
|
client: reqwest::Client,
|
||||||
|
output_buf: RefCell<Vec<u8>>,
|
||||||
|
endpoint: RefCell<Option<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for ServeClient {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let mut child = self.child.borrow_mut();
|
||||||
|
child.kill().unwrap();
|
||||||
|
child.wait().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ServeClientBuilder(util::TestCommandBuilder, Option<String>);
|
||||||
|
|
||||||
|
impl ServeClientBuilder {
|
||||||
|
fn build(self) -> ServeClient {
|
||||||
|
let Some(entry_point) = self.1 else {
|
||||||
|
panic!("entry point required");
|
||||||
|
};
|
||||||
|
let cmd = self.0.arg(entry_point);
|
||||||
|
let child = cmd.spawn().unwrap();
|
||||||
|
|
||||||
|
ServeClient::with_child(child)
|
||||||
|
}
|
||||||
|
fn map(
|
||||||
|
self,
|
||||||
|
f: impl FnOnce(util::TestCommandBuilder) -> util::TestCommandBuilder,
|
||||||
|
) -> Self {
|
||||||
|
Self(f(self.0), self.1)
|
||||||
|
}
|
||||||
|
fn entry_point(self, file: impl AsRef<str>) -> Self {
|
||||||
|
Self(self.0, Some(file.as_ref().into()))
|
||||||
|
}
|
||||||
|
fn worker_count(self, n: Option<u64>) -> Self {
|
||||||
|
self.map(|t| {
|
||||||
|
let t = t.arg("--parallel");
|
||||||
|
if let Some(n) = n {
|
||||||
|
t.env("DENO_JOBS", n.to_string())
|
||||||
|
} else {
|
||||||
|
t
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fn new() -> Self {
|
||||||
|
Self(
|
||||||
|
util::deno_cmd()
|
||||||
.current_dir(util::testdata_path())
|
.current_dir(util::testdata_path())
|
||||||
.arg("serve")
|
.arg("serve")
|
||||||
.arg("--port")
|
.arg("--port")
|
||||||
.arg("0")
|
.arg("0")
|
||||||
.arg("./serve/port_0.ts")
|
.stdout_piped(),
|
||||||
.stdout_piped()
|
None,
|
||||||
.spawn()
|
)
|
||||||
.unwrap();
|
}
|
||||||
let stdout = child.stdout.as_mut().unwrap();
|
}
|
||||||
let mut buffer = [0; 52];
|
|
||||||
let _read = stdout.read(&mut buffer).unwrap();
|
|
||||||
let msg = std::str::from_utf8(&buffer).unwrap();
|
|
||||||
let port_regex = Regex::new(r":(\d+)").unwrap();
|
|
||||||
let port = port_regex.captures(msg).unwrap().get(1).unwrap().as_str();
|
|
||||||
|
|
||||||
let cert = reqwest::Certificate::from_pem(include_bytes!(
|
impl ServeClient {
|
||||||
|
fn builder() -> ServeClientBuilder {
|
||||||
|
ServeClientBuilder::new()
|
||||||
|
}
|
||||||
|
fn with_child(child: DenoChild) -> Self {
|
||||||
|
Self {
|
||||||
|
child: RefCell::new(child),
|
||||||
|
output_buf: Default::default(),
|
||||||
|
endpoint: Default::default(),
|
||||||
|
client: reqwest::Client::builder()
|
||||||
|
.add_root_certificate(
|
||||||
|
reqwest::Certificate::from_pem(include_bytes!(
|
||||||
"../testdata/tls/RootCA.crt"
|
"../testdata/tls/RootCA.crt"
|
||||||
))
|
))
|
||||||
.unwrap();
|
.unwrap(),
|
||||||
|
)
|
||||||
let client = reqwest::Client::builder()
|
// disable connection pooling so we create a new connection per request
|
||||||
.add_root_certificate(cert)
|
// which allows us to distribute requests across workers
|
||||||
|
.pool_max_idle_per_host(0)
|
||||||
|
.pool_idle_timeout(Duration::from_nanos(1))
|
||||||
.http2_prior_knowledge()
|
.http2_prior_knowledge()
|
||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let res = client
|
fn kill(self) {
|
||||||
.get(&format!("http://127.0.0.1:{port}"))
|
let mut child = self.child.borrow_mut();
|
||||||
.send()
|
child.kill().unwrap();
|
||||||
.await
|
child.wait().unwrap();
|
||||||
.unwrap();
|
}
|
||||||
|
|
||||||
|
fn output(self) -> String {
|
||||||
|
let mut child = self.child.borrow_mut();
|
||||||
|
child.kill().unwrap();
|
||||||
|
let mut stdout = child.stdout.take().unwrap();
|
||||||
|
child.wait().unwrap();
|
||||||
|
|
||||||
|
let mut output_buf = self.output_buf.borrow_mut();
|
||||||
|
|
||||||
|
stdout.read_to_end(&mut output_buf).unwrap();
|
||||||
|
|
||||||
|
String::from_utf8(std::mem::take(&mut *output_buf)).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get(&self) -> RequestBuilder {
|
||||||
|
let endpoint = self.endpoint();
|
||||||
|
self.client.get(&*endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn endpoint(&self) -> String {
|
||||||
|
if let Some(e) = self.endpoint.borrow().as_ref() {
|
||||||
|
return e.to_string();
|
||||||
|
};
|
||||||
|
let mut buffer = self.output_buf.borrow_mut();
|
||||||
|
let mut temp_buf = [0u8; 64];
|
||||||
|
let mut child = self.child.borrow_mut();
|
||||||
|
let stdout = child.stdout.as_mut().unwrap();
|
||||||
|
let port_regex = regex::bytes::Regex::new(r":(\d+)").unwrap();
|
||||||
|
|
||||||
|
let start = std::time::Instant::now();
|
||||||
|
// try to find the port number in the output
|
||||||
|
// it may not be the first line, so we need to read the output in a loop
|
||||||
|
let port = loop {
|
||||||
|
if start.elapsed() > Duration::from_secs(5) {
|
||||||
|
panic!(
|
||||||
|
"timed out waiting for serve to start. serve output:\n{}",
|
||||||
|
String::from_utf8_lossy(&buffer)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let read = stdout.read(&mut temp_buf).unwrap();
|
||||||
|
buffer.extend_from_slice(&temp_buf[..read]);
|
||||||
|
if let Some(p) = port_regex
|
||||||
|
.captures(&buffer)
|
||||||
|
.and_then(|c| c.get(1))
|
||||||
|
.map(|v| std::str::from_utf8(v.as_bytes()).unwrap().to_owned())
|
||||||
|
{
|
||||||
|
break p;
|
||||||
|
}
|
||||||
|
// this is technically blocking, but it's just a test and
|
||||||
|
// I don't want to switch RefCell to Mutex just for this
|
||||||
|
std::thread::sleep(Duration::from_millis(10));
|
||||||
|
};
|
||||||
|
self
|
||||||
|
.endpoint
|
||||||
|
.replace(Some(format!("http://127.0.0.1:{port}")));
|
||||||
|
|
||||||
|
return self.endpoint.borrow().clone().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn deno_serve_port_0() {
|
||||||
|
let client = ServeClient::builder()
|
||||||
|
.entry_point("./serve/port_0.ts")
|
||||||
|
.build();
|
||||||
|
let res = client.get().send().await.unwrap();
|
||||||
assert_eq!(200, res.status());
|
assert_eq!(200, res.status());
|
||||||
|
|
||||||
let body = res.text().await.unwrap();
|
let body = res.text().await.unwrap();
|
||||||
assert_eq!(body, "deno serve --port 0 works!");
|
assert_eq!(body, "deno serve --port 0 works!");
|
||||||
|
client.kill();
|
||||||
child.kill().unwrap();
|
|
||||||
child.wait().unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn deno_serve_no_args() {
|
async fn deno_serve_no_args() {
|
||||||
let mut child = util::deno_cmd()
|
let client = ServeClient::builder()
|
||||||
.current_dir(util::testdata_path())
|
.entry_point("./serve/no_args.ts")
|
||||||
.arg("serve")
|
.build();
|
||||||
.arg("--port")
|
let res = client.get().send().await.unwrap();
|
||||||
.arg("0")
|
|
||||||
.arg("./serve/no_args.ts")
|
|
||||||
.stdout_piped()
|
|
||||||
.spawn()
|
|
||||||
.unwrap();
|
|
||||||
let stdout = child.stdout.as_mut().unwrap();
|
|
||||||
let mut buffer = [0; 52];
|
|
||||||
let _read = stdout.read(&mut buffer).unwrap();
|
|
||||||
let msg = std::str::from_utf8(&buffer).unwrap();
|
|
||||||
let port_regex = Regex::new(r":(\d+)").unwrap();
|
|
||||||
let port = port_regex.captures(msg).unwrap().get(1).unwrap().as_str();
|
|
||||||
|
|
||||||
let cert = reqwest::Certificate::from_pem(include_bytes!(
|
|
||||||
"../testdata/tls/RootCA.crt"
|
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let client = reqwest::Client::builder()
|
|
||||||
.add_root_certificate(cert)
|
|
||||||
.http2_prior_knowledge()
|
|
||||||
.build()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let res = client
|
|
||||||
.get(&format!("http://127.0.0.1:{port}"))
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(200, res.status());
|
assert_eq!(200, res.status());
|
||||||
|
|
||||||
let body = res.text().await.unwrap();
|
let body = res.text().await.unwrap();
|
||||||
assert_eq!(body, "deno serve with no args in fetch() works!");
|
assert_eq!(body, "deno serve with no args in fetch() works!");
|
||||||
|
}
|
||||||
child.kill().unwrap();
|
|
||||||
child.wait().unwrap();
|
#[tokio::test]
|
||||||
|
async fn deno_serve_parallel() {
|
||||||
|
let client = ServeClient::builder()
|
||||||
|
.entry_point("./serve/parallel.ts")
|
||||||
|
.worker_count(Some(4))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let mut serve_counts = HashMap::<u32, u32>::new();
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_millis(1000)).await;
|
||||||
|
|
||||||
|
let serve_regex =
|
||||||
|
Regex::new(r"\[serve\-worker\-(\d+)\s*\] serving request").unwrap();
|
||||||
|
|
||||||
|
for _ in 0..100 {
|
||||||
|
let response = timeout(Duration::from_secs(2), client.get().send())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(200, response.status());
|
||||||
|
let body = response.text().await.unwrap();
|
||||||
|
assert_eq!(body, "deno serve parallel");
|
||||||
|
tokio::time::sleep(Duration::from_millis(1)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let output = client.output();
|
||||||
|
|
||||||
|
let listening_regex =
|
||||||
|
Regex::new(r"Listening on http[\w:/\.]+ with (\d+) threads").unwrap();
|
||||||
|
|
||||||
|
eprintln!("serve output:\n{output}");
|
||||||
|
assert_eq!(
|
||||||
|
listening_regex
|
||||||
|
.captures(&output)
|
||||||
|
.unwrap()
|
||||||
|
.get(1)
|
||||||
|
.unwrap()
|
||||||
|
.as_str()
|
||||||
|
.trim(),
|
||||||
|
"4"
|
||||||
|
);
|
||||||
|
|
||||||
|
for capture in serve_regex.captures_iter(&output) {
|
||||||
|
if let Some(worker_number) =
|
||||||
|
capture.get(1).and_then(|m| m.as_str().parse::<u32>().ok())
|
||||||
|
{
|
||||||
|
*serve_counts.entry(worker_number).or_default() += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
serve_counts.values().filter(|&&n| n > 2).count() >= 2,
|
||||||
|
"bad {serve_counts:?}"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
7
tests/testdata/serve/parallel.ts
vendored
Normal file
7
tests/testdata/serve/parallel.ts
vendored
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
console.log("starting serve");
|
||||||
|
export default {
|
||||||
|
fetch(_req: Request) {
|
||||||
|
console.log("serving request");
|
||||||
|
return new Response("deno serve parallel");
|
||||||
|
},
|
||||||
|
};
|
Loading…
Reference in a new issue