From e92a05b5518e5fd30559c96c5990b08657bbc3e4 Mon Sep 17 00:00:00 2001 From: Nathan Whitaker <17734409+nathanwhit@users.noreply.github.com> Date: Wed, 14 Aug 2024 15:26:21 -0700 Subject: [PATCH] feat(serve): Opt-in parallelism for `deno serve` (#24920) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a `parallel` flag to `deno serve`. When present, we spawn multiple workers to parallelize serving requests. ```bash deno serve --parallel main.ts ``` Currently on linux we use `SO_REUSEPORT` and rely on the fact that the kernel will distribute connections in a round-robin manner. On mac and windows, we sort of emulate this by cloning the underlying file descriptor and passing a handle to each worker. The connections will not be guaranteed to be fairly distributed (and in practice almost certainly won't be), but the distribution is still spread enough to provide a significant performance increase. --- (Run on an Macbook Pro with an M3 Max, serving `deno.com` baseline:: ``` ❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000 Running 30s test @ http://127.0.0.1:8000 2 threads and 125 connections Thread Stats Avg Stdev Max +/- Stdev Latency 239.78ms 13.56ms 330.54ms 79.12% Req/Sec 258.58 35.56 360.00 70.64% Latency Distribution 50% 236.72ms 75% 248.46ms 90% 256.84ms 99% 268.23ms 15458 requests in 30.02s, 2.47GB read Requests/sec: 514.89 Transfer/sec: 84.33MB ``` this PR (`with --parallel` flag) ``` ❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000 Running 30s test @ http://127.0.0.1:8000 2 threads and 125 connections Thread Stats Avg Stdev Max +/- Stdev Latency 117.40ms 142.84ms 590.45ms 79.07% Req/Sec 1.33k 175.19 1.77k 69.00% Latency Distribution 50% 22.34ms 75% 223.67ms 90% 357.32ms 99% 460.50ms 79636 requests in 30.07s, 12.74GB read Requests/sec: 2647.96 Transfer/sec: 433.71MB ``` --- Cargo.toml | 6 + cli/args/flags.rs | 93 ++++++---- cli/main.rs | 2 +- cli/tools/mod.rs | 1 + cli/tools/run/mod.rs | 16 +- cli/tools/serve.rs | 192 ++++++++++++++++++++ cli/worker.rs | 8 +- ext/http/00_serve.ts | 26 ++- ext/net/01_net.js | 12 +- ext/net/ops.rs | 7 +- ext/net/ops_tls.rs | 8 +- runtime/js/99_main.js | 49 +++++- runtime/worker_bootstrap.rs | 40 ++++- tests/integration/serve_tests.rs | 291 +++++++++++++++++++++++-------- tests/testdata/serve/parallel.ts | 7 + 15 files changed, 617 insertions(+), 141 deletions(-) create mode 100644 cli/tools/serve.rs create mode 100644 tests/testdata/serve/parallel.ts diff --git a/Cargo.toml b/Cargo.toml index 14437003e0..3032b40b5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -229,6 +229,12 @@ opt-level = 'z' # Optimize for size inherits = "release" debug = true +# Faster to compile than `release` but with similar performance. +[profile.release-lite] +inherits = "release" +codegen-units = 128 +lto = "thin" + # NB: the `bench` and `release` profiles must remain EXACTLY the same. [profile.bench] codegen-units = 1 diff --git a/cli/args/flags.rs b/cli/args/flags.rs index f8577ed1b8..800d6ff5a0 100644 --- a/cli/args/flags.rs +++ b/cli/args/flags.rs @@ -339,6 +339,7 @@ pub struct ServeFlags { pub watch: Option, pub port: u16, pub host: String, + pub worker_count: Option, } impl ServeFlags { @@ -349,6 +350,7 @@ impl ServeFlags { watch: None, port, host: host.to_owned(), + worker_count: None, } } } @@ -2693,6 +2695,9 @@ fn serve_subcommand() -> Command { .help("The TCP address to serve on, defaulting to 0.0.0.0 (all interfaces).") .value_parser(serve_host_validator), ) + .arg( + parallel_arg("multiple server workers", false) + ) .arg(check_arg(false)) .arg(watch_arg(true)) .arg(watch_exclude_arg()) @@ -2854,11 +2859,7 @@ Directory arguments are expanded to all contained files matching the glob .action(ArgAction::SetTrue), ) .arg( - Arg::new("parallel") - .long("parallel") - .help("Run test modules in parallel. Parallelism defaults to the number of available CPUs or the value in the DENO_JOBS environment variable.") - .conflicts_with("jobs") - .action(ArgAction::SetTrue) + parallel_arg("test modules", true) ) .arg( Arg::new("jobs") @@ -2901,6 +2902,18 @@ Directory arguments are expanded to all contained files matching the glob ) } +fn parallel_arg(descr: &str, jobs_fallback: bool) -> Arg { + let arg = Arg::new("parallel") + .long("parallel") + .help(format!("Run {descr} in parallel. Parallelism defaults to the number of available CPUs or the value in the DENO_JOBS environment variable.")) + .action(ArgAction::SetTrue); + if jobs_fallback { + arg.conflicts_with("jobs") + } else { + arg + } +} + fn types_subcommand() -> Command { Command::new("types").about( "Print runtime TypeScript declarations. @@ -4416,6 +4429,8 @@ fn serve_parse( .remove_one::("host") .unwrap_or_else(|| "0.0.0.0".to_owned()); + let worker_count = parallel_arg_parse(matches, false).map(|v| v.get()); + runtime_args_parse(flags, matches, true, true); // If the user didn't pass --allow-net, add this port to the network // allowlist. If the host is 0.0.0.0, we add :{port} and allow the same network perms @@ -4455,6 +4470,7 @@ fn serve_parse( watch: watch_arg_parse_with_paths(matches), port, host, + worker_count, }); Ok(()) @@ -4486,6 +4502,42 @@ fn task_parse(flags: &mut Flags, matches: &mut ArgMatches) { flags.subcommand = DenoSubcommand::Task(task_flags); } +fn parallel_arg_parse( + matches: &mut ArgMatches, + fallback_to_jobs: bool, +) -> Option { + if matches.get_flag("parallel") { + if let Ok(value) = env::var("DENO_JOBS") { + value.parse::().ok() + } else { + std::thread::available_parallelism().ok() + } + } else if fallback_to_jobs && matches.contains_id("jobs") { + // We can't change this to use the log crate because its not configured + // yet at this point since the flags haven't been parsed. This flag is + // deprecated though so it's not worth changing the code to use the log + // crate here and this is only done for testing anyway. + #[allow(clippy::print_stderr)] + { + eprintln!( + "⚠️ {}", + crate::colors::yellow(concat!( + "The `--jobs` flag is deprecated and will be removed in Deno 2.0.\n", + "Use the `--parallel` flag with possibly the `DENO_JOBS` environment variable instead.\n", + "Learn more at: https://docs.deno.com/runtime/manual/basics/env_variables" + )), + ); + } + if let Some(value) = matches.remove_one::("jobs") { + Some(value) + } else { + std::thread::available_parallelism().ok() + } + } else { + None + } +} + fn test_parse(flags: &mut Flags, matches: &mut ArgMatches) { flags.type_check_mode = TypeCheckMode::Local; runtime_args_parse(flags, matches, true, true); @@ -4552,36 +4604,7 @@ fn test_parse(flags: &mut Flags, matches: &mut ArgMatches) { flags.argv.extend(script_arg); } - let concurrent_jobs = if matches.get_flag("parallel") { - if let Ok(value) = env::var("DENO_JOBS") { - value.parse::().ok() - } else { - std::thread::available_parallelism().ok() - } - } else if matches.contains_id("jobs") { - // We can't change this to use the log crate because its not configured - // yet at this point since the flags haven't been parsed. This flag is - // deprecated though so it's not worth changing the code to use the log - // crate here and this is only done for testing anyway. - #[allow(clippy::print_stderr)] - { - eprintln!( - "⚠️ {}", - crate::colors::yellow(concat!( - "The `--jobs` flag is deprecated and will be removed in Deno 2.0.\n", - "Use the `--parallel` flag with possibly the `DENO_JOBS` environment variable instead.\n", - "Learn more at: https://docs.deno.com/runtime/manual/basics/env_variables" - )), - ); - } - if let Some(value) = matches.remove_one::("jobs") { - Some(value) - } else { - std::thread::available_parallelism().ok() - } - } else { - None - }; + let concurrent_jobs = parallel_arg_parse(matches, true); let include = if let Some(files) = matches.remove_many::("files") { files.collect() diff --git a/cli/main.rs b/cli/main.rs index e8ecaa3930..1752c3373b 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -236,7 +236,7 @@ async fn run_subcommand(flags: Arc) -> Result { } }), DenoSubcommand::Serve(serve_flags) => spawn_subcommand(async move { - tools::run::run_script(WorkerExecutionMode::Serve, flags, serve_flags.watch).await + tools::serve::serve(flags, serve_flags).await }), DenoSubcommand::Task(task_flags) => spawn_subcommand(async { tools::task::execute_script(flags, task_flags, false).await diff --git a/cli/tools/mod.rs b/cli/tools/mod.rs index 7bb9b7cf65..1e1c655658 100644 --- a/cli/tools/mod.rs +++ b/cli/tools/mod.rs @@ -16,6 +16,7 @@ pub mod lint; pub mod registry; pub mod repl; pub mod run; +pub mod serve; pub mod task; pub mod test; pub mod upgrade; diff --git a/cli/tools/run/mod.rs b/cli/tools/run/mod.rs index 65044fbad3..1964cfdd9a 100644 --- a/cli/tools/run/mod.rs +++ b/cli/tools/run/mod.rs @@ -18,11 +18,7 @@ use crate::util::file_watcher::WatcherRestartMode; pub mod hmr; -pub async fn run_script( - mode: WorkerExecutionMode, - flags: Arc, - watch: Option, -) -> Result { +pub fn check_permission_before_script(flags: &Flags) { if !flags.has_permission() && flags.has_permission_in_argv() { log::warn!( "{}", @@ -33,6 +29,14 @@ To grant permissions, set them before the script argument. For example: ) ); } +} + +pub async fn run_script( + mode: WorkerExecutionMode, + flags: Arc, + watch: Option, +) -> Result { + check_permission_before_script(&flags); if let Some(watch_flags) = watch { return run_with_watch(mode, flags, watch_flags).await; @@ -187,7 +191,7 @@ pub async fn eval_command( Ok(exit_code) } -async fn maybe_npm_install(factory: &CliFactory) -> Result<(), AnyError> { +pub async fn maybe_npm_install(factory: &CliFactory) -> Result<(), AnyError> { // ensure an "npm install" is done if the user has explicitly // opted into using a managed node_modules directory if factory.cli_options()?.node_modules_dir_enablement() == Some(true) { diff --git a/cli/tools/serve.rs b/cli/tools/serve.rs new file mode 100644 index 0000000000..24666b8f64 --- /dev/null +++ b/cli/tools/serve.rs @@ -0,0 +1,192 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use std::sync::Arc; + +use deno_core::error::AnyError; +use deno_core::futures::TryFutureExt; +use deno_core::ModuleSpecifier; +use deno_runtime::deno_permissions::Permissions; +use deno_runtime::deno_permissions::PermissionsContainer; + +use super::run::check_permission_before_script; +use super::run::maybe_npm_install; +use crate::args::Flags; +use crate::args::ServeFlags; +use crate::args::WatchFlagsWithPaths; +use crate::factory::CliFactory; +use crate::util::file_watcher::WatcherRestartMode; +use crate::worker::CliMainWorkerFactory; + +pub async fn serve( + flags: Arc, + serve_flags: ServeFlags, +) -> Result { + check_permission_before_script(&flags); + + if let Some(watch_flags) = serve_flags.watch { + return serve_with_watch(flags, watch_flags, serve_flags.worker_count) + .await; + } + + let factory = CliFactory::from_flags(flags); + let cli_options = factory.cli_options()?; + let deno_dir = factory.deno_dir()?; + let http_client = factory.http_client_provider(); + + // Run a background task that checks for available upgrades or output + // if an earlier run of this background task found a new version of Deno. + #[cfg(feature = "upgrade")] + super::upgrade::check_for_upgrades( + http_client.clone(), + deno_dir.upgrade_check_file_path(), + ); + + let main_module = cli_options.resolve_main_module()?; + + maybe_npm_install(&factory).await?; + + let permissions = PermissionsContainer::new(Permissions::from_options( + &cli_options.permissions_options()?, + )?); + let worker_factory = factory.create_cli_main_worker_factory().await?; + + do_serve( + worker_factory, + main_module, + permissions, + serve_flags.worker_count, + false, + ) + .await +} + +async fn do_serve( + worker_factory: CliMainWorkerFactory, + main_module: ModuleSpecifier, + permissions: PermissionsContainer, + worker_count: Option, + hmr: bool, +) -> Result { + let mut worker = worker_factory + .create_main_worker( + deno_runtime::WorkerExecutionMode::Serve { + is_main: true, + worker_count, + }, + main_module.clone(), + permissions.clone(), + ) + .await?; + let worker_count = match worker_count { + None | Some(1) => return worker.run().await, + Some(c) => c, + }; + + let main = deno_core::unsync::spawn(async move { worker.run().await }); + + let extra_workers = worker_count.saturating_sub(1); + + let mut channels = Vec::with_capacity(extra_workers); + for i in 0..extra_workers { + let worker_factory = worker_factory.clone(); + let main_module = main_module.clone(); + let permissions = permissions.clone(); + let (tx, rx) = tokio::sync::oneshot::channel(); + channels.push(rx); + std::thread::Builder::new() + .name(format!("serve-worker-{i}")) + .spawn(move || { + deno_runtime::tokio_util::create_and_run_current_thread(async move { + let result = + run_worker(i, worker_factory, main_module, permissions, hmr).await; + let _ = tx.send(result); + }); + })?; + } + + let (main_result, worker_results) = tokio::try_join!( + main.map_err(AnyError::from), + deno_core::futures::future::try_join_all( + channels.into_iter().map(|r| r.map_err(AnyError::from)) + ) + )?; + + let mut exit_code = main_result?; + for res in worker_results { + let ret = res?; + if ret != 0 && exit_code == 0 { + exit_code = ret; + } + } + Ok(exit_code) + + // main.await? +} + +async fn run_worker( + worker_count: usize, + worker_factory: CliMainWorkerFactory, + main_module: ModuleSpecifier, + permissions: PermissionsContainer, + hmr: bool, +) -> Result { + let mut worker = worker_factory + .create_main_worker( + deno_runtime::WorkerExecutionMode::Serve { + is_main: false, + worker_count: Some(worker_count), + }, + main_module, + permissions, + ) + .await?; + if hmr { + worker.run_for_watcher().await?; + Ok(0) + } else { + worker.run().await + } +} + +async fn serve_with_watch( + flags: Arc, + watch_flags: WatchFlagsWithPaths, + worker_count: Option, +) -> Result { + let hmr = watch_flags.hmr; + crate::util::file_watcher::watch_recv( + flags, + crate::util::file_watcher::PrintConfig::new_with_banner( + if watch_flags.hmr { "HMR" } else { "Watcher" }, + "Process", + !watch_flags.no_clear_screen, + ), + WatcherRestartMode::Automatic, + move |flags, watcher_communicator, _changed_paths| { + Ok(async move { + let factory = CliFactory::from_flags_for_watcher( + flags, + watcher_communicator.clone(), + ); + let cli_options = factory.cli_options()?; + let main_module = cli_options.resolve_main_module()?; + + maybe_npm_install(&factory).await?; + + let _ = watcher_communicator.watch_paths(cli_options.watch_paths()); + + let permissions = PermissionsContainer::new(Permissions::from_options( + &cli_options.permissions_options()?, + )?); + let worker_factory = factory.create_cli_main_worker_factory().await?; + + do_serve(worker_factory, main_module, permissions, worker_count, hmr) + .await?; + + Ok(()) + }) + }, + ) + .await?; + Ok(0) +} diff --git a/cli/worker.rs b/cli/worker.rs index 82051da6ce..7f04e8846d 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -414,6 +414,7 @@ impl CliMainWorker { } } +#[derive(Clone)] pub struct CliMainWorkerFactory { shared: Arc, } @@ -546,7 +547,7 @@ impl CliMainWorkerFactory { let maybe_inspector_server = shared.maybe_inspector_server.clone(); let create_web_worker_cb = - create_web_worker_callback(mode, shared.clone(), stdio.clone()); + create_web_worker_callback(shared.clone(), stdio.clone()); let maybe_storage_key = shared .storage_key_resolver @@ -739,7 +740,6 @@ impl CliMainWorkerFactory { } fn create_web_worker_callback( - mode: WorkerExecutionMode, shared: Arc, stdio: deno_runtime::deno_io::Stdio, ) -> Arc { @@ -752,7 +752,7 @@ fn create_web_worker_callback( args.permissions.clone(), ); let create_web_worker_cb = - create_web_worker_callback(mode, shared.clone(), stdio.clone()); + create_web_worker_callback(shared.clone(), stdio.clone()); let maybe_storage_key = shared .storage_key_resolver @@ -802,7 +802,7 @@ fn create_web_worker_callback( disable_deprecated_api_warning: shared.disable_deprecated_api_warning, verbose_deprecated_api_warning: shared.verbose_deprecated_api_warning, future: shared.enable_future_features, - mode, + mode: WorkerExecutionMode::Worker, serve_port: shared.serve_port, serve_host: shared.serve_host.clone(), }, diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts index 8ed1a1d04a..9c6f805528 100644 --- a/ext/http/00_serve.ts +++ b/ext/http/00_serve.ts @@ -579,6 +579,8 @@ type RawServeOptions = { handler?: RawHandler; }; +const kLoadBalanced = Symbol("kLoadBalanced"); + function serve(arg1, arg2) { let options: RawServeOptions | undefined; let handler: RawHandler | undefined; @@ -634,6 +636,7 @@ function serve(arg1, arg2) { hostname: options.hostname ?? "0.0.0.0", port: options.port ?? 8000, reusePort: options.reusePort ?? false, + loadBalanced: options[kLoadBalanced] ?? false, }; if (options.certFile || options.keyFile) { @@ -842,18 +845,25 @@ function registerDeclarativeServer(exports) { "Invalid type for fetch: must be a function with a single or no parameter", ); } - return ({ servePort, serveHost }) => { + return ({ servePort, serveHost, serveIsMain, serveWorkerCount }) => { Deno.serve({ port: servePort, hostname: serveHost, + [kLoadBalanced]: (serveIsMain && serveWorkerCount > 1) || + (serveWorkerCount !== null), onListen: ({ port, hostname }) => { - console.debug( - `%cdeno serve%c: Listening on %chttp://${hostname}:${port}/%c`, - "color: green", - "color: inherit", - "color: yellow", - "color: inherit", - ); + if (serveIsMain) { + const nThreads = serveWorkerCount > 1 + ? ` with ${serveWorkerCount} threads` + : ""; + console.debug( + `%cdeno serve%c: Listening on %chttp://${hostname}:${port}/%c${nThreads}`, + "color: green", + "color: inherit", + "color: yellow", + "color: inherit", + ); + } }, handler: (req) => { return exports.fetch(req); diff --git a/ext/net/01_net.js b/ext/net/01_net.js index 517ab127ee..536f79bbfa 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -531,10 +531,14 @@ const listenOptionApiName = Symbol("listenOptionApiName"); function listen(args) { switch (args.transport ?? "tcp") { case "tcp": { - const { 0: rid, 1: addr } = op_net_listen_tcp({ - hostname: args.hostname ?? "0.0.0.0", - port: Number(args.port), - }, args.reusePort); + const { 0: rid, 1: addr } = op_net_listen_tcp( + { + hostname: args.hostname ?? "0.0.0.0", + port: Number(args.port), + }, + args.reusePort, + args.loadBalanced ?? false, + ); addr.transport = "tcp"; return new Listener(rid, addr); } diff --git a/ext/net/ops.rs b/ext/net/ops.rs index f28778d29f..b74dc8d755 100644 --- a/ext/net/ops.rs +++ b/ext/net/ops.rs @@ -353,6 +353,7 @@ pub fn op_net_listen_tcp( state: &mut OpState, #[serde] addr: IpAddr, reuse_port: bool, + load_balanced: bool, ) -> Result<(ResourceId, IpAddr), AnyError> where NP: NetPermissions + 'static, @@ -367,7 +368,11 @@ where .next() .ok_or_else(|| generic_error("No resolved address found"))?; - let listener = TcpListener::bind_direct(addr, reuse_port)?; + let listener = if load_balanced { + TcpListener::bind_load_balanced(addr) + } else { + TcpListener::bind_direct(addr, reuse_port) + }?; let local_addr = listener.local_addr()?; let listener_resource = NetworkListenerResource::new(listener); let rid = state.resource_table.add(listener_resource); diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index a2a27c4adf..8483e7e668 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -475,6 +475,8 @@ fn load_private_keys_from_file( pub struct ListenTlsArgs { alpn_protocols: Option>, reuse_port: bool, + #[serde(default)] + load_balanced: bool, } #[op2] @@ -502,7 +504,11 @@ where .next() .ok_or_else(|| generic_error("No resolved address found"))?; - let tcp_listener = TcpListener::bind_direct(bind_addr, args.reuse_port)?; + let tcp_listener = if args.load_balanced { + TcpListener::bind_load_balanced(bind_addr) + } else { + TcpListener::bind_direct(bind_addr, args.reuse_port) + }?; let local_addr = tcp_listener.local_addr()?; let alpn = args .alpn_protocols diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index ca96e34b7a..5e25a3818c 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -45,6 +45,7 @@ const { PromiseResolve, SafeSet, StringPrototypeIncludes, + StringPrototypePadEnd, StringPrototypeSplit, StringPrototypeTrim, Symbol, @@ -709,8 +710,37 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) { 11: mode, 12: servePort, 13: serveHost, + 14: serveIsMain, + 15: serveWorkerCount, } = runtimeOptions; + if (mode === executionModes.serve) { + if (serveIsMain && serveWorkerCount) { + const origLog = console.log; + const origError = console.error; + const prefix = `[serve-worker-0 ]`; + console.log = (...args) => { + return origLog(prefix, ...new primordials.SafeArrayIterator(args)); + }; + console.error = (...args) => { + return origError(prefix, ...new primordials.SafeArrayIterator(args)); + }; + } else if (serveWorkerCount !== null) { + const origLog = console.log; + const origError = console.error; + const base = `serve-worker-${serveWorkerCount + 1}`; + // 15 = "serve-worker-nn".length, assuming + // serveWorkerCount < 100 + const prefix = `[${StringPrototypePadEnd(base, 15, " ")}]`; + console.log = (...args) => { + return origLog(prefix, ...new primordials.SafeArrayIterator(args)); + }; + console.error = (...args) => { + return origError(prefix, ...new primordials.SafeArrayIterator(args)); + }; + } + } + if (mode === executionModes.run || mode === executionModes.serve) { let serve = undefined; core.addMainModuleHandler((main) => { @@ -725,13 +755,16 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) { } if (mode === executionModes.serve && !serve) { - console.error( - `%cerror: %cdeno serve requires %cexport default { fetch }%c in the main module, did you mean to run \"deno run\"?`, - "color: yellow;", - "color: inherit;", - "font-weight: bold;", - "font-weight: normal;", - ); + if (serveIsMain) { + // Only error if main worker + console.error( + `%cerror: %cdeno serve requires %cexport default { fetch }%c in the main module, did you mean to run \"deno run\"?`, + "color: yellow;", + "color: inherit;", + "font-weight: bold;", + "font-weight: normal;", + ); + } return; } @@ -746,7 +779,7 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) { ); } if (mode === executionModes.serve) { - serve({ servePort, serveHost }); + serve({ servePort, serveHost, serveIsMain, serveWorkerCount }); } } }); diff --git a/runtime/worker_bootstrap.rs b/runtime/worker_bootstrap.rs index b13c3c428f..afd3242e89 100644 --- a/runtime/worker_bootstrap.rs +++ b/runtime/worker_bootstrap.rs @@ -10,7 +10,6 @@ use deno_terminal::colors; /// The execution mode for this worker. Some modes may have implicit behaviour. #[derive(Copy, Clone)] -#[repr(u8)] pub enum WorkerExecutionMode { /// No special behaviour. None, @@ -28,11 +27,39 @@ pub enum WorkerExecutionMode { /// `deno bench` Bench, /// `deno serve` - Serve, + Serve { + is_main: bool, + worker_count: Option, + }, /// `deno jupyter` Jupyter, } +impl WorkerExecutionMode { + pub fn discriminant(&self) -> u8 { + match self { + WorkerExecutionMode::None => 0, + WorkerExecutionMode::Worker => 1, + WorkerExecutionMode::Run => 2, + WorkerExecutionMode::Repl => 3, + WorkerExecutionMode::Eval => 4, + WorkerExecutionMode::Test => 5, + WorkerExecutionMode::Bench => 6, + WorkerExecutionMode::Serve { .. } => 7, + WorkerExecutionMode::Jupyter => 8, + } + } + pub fn serve_info(&self) -> (Option, Option) { + match *self { + WorkerExecutionMode::Serve { + is_main, + worker_count, + } => (Some(is_main), worker_count), + _ => (None, None), + } + } +} + /// The log level to use when printing diagnostic log messages, warnings, /// or errors in the worker. /// @@ -175,6 +202,10 @@ struct BootstrapV8<'a>( u16, // serve host Option<&'a str>, + // serve is main + Option, + // serve worker count + Option, ); impl BootstrapOptions { @@ -186,6 +217,7 @@ impl BootstrapOptions { let scope = RefCell::new(scope); let ser = deno_core::serde_v8::Serializer::new(&scope); + let (serve_is_main, serve_worker_count) = self.mode.serve_info(); let bootstrap = BootstrapV8( self.location.as_ref().map(|l| l.as_str()), self.unstable, @@ -198,9 +230,11 @@ impl BootstrapOptions { self.disable_deprecated_api_warning, self.verbose_deprecated_api_warning, self.future, - self.mode as u8 as _, + self.mode.discriminant() as _, self.serve_port.unwrap_or_default(), self.serve_host.as_deref(), + serve_is_main, + serve_worker_count, ); bootstrap.serialize(ser).unwrap() diff --git a/tests/integration/serve_tests.rs b/tests/integration/serve_tests.rs index cfe7e4d6a8..f3d887ec27 100644 --- a/tests/integration/serve_tests.rs +++ b/tests/integration/serve_tests.rs @@ -1,93 +1,244 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. +use std::cell::RefCell; +use std::collections::HashMap; use std::io::Read; +use std::time::Duration; use pretty_assertions::assert_eq; use regex::Regex; +use reqwest::RequestBuilder; use test_util as util; +use test_util::DenoChild; +use tokio::time::timeout; + +struct ServeClient { + child: RefCell, + client: reqwest::Client, + output_buf: RefCell>, + endpoint: RefCell>, +} + +impl Drop for ServeClient { + fn drop(&mut self) { + let mut child = self.child.borrow_mut(); + child.kill().unwrap(); + child.wait().unwrap(); + } +} + +struct ServeClientBuilder(util::TestCommandBuilder, Option); + +impl ServeClientBuilder { + fn build(self) -> ServeClient { + let Some(entry_point) = self.1 else { + panic!("entry point required"); + }; + let cmd = self.0.arg(entry_point); + let child = cmd.spawn().unwrap(); + + ServeClient::with_child(child) + } + fn map( + self, + f: impl FnOnce(util::TestCommandBuilder) -> util::TestCommandBuilder, + ) -> Self { + Self(f(self.0), self.1) + } + fn entry_point(self, file: impl AsRef) -> Self { + Self(self.0, Some(file.as_ref().into())) + } + fn worker_count(self, n: Option) -> Self { + self.map(|t| { + let t = t.arg("--parallel"); + if let Some(n) = n { + t.env("DENO_JOBS", n.to_string()) + } else { + t + } + }) + } + fn new() -> Self { + Self( + util::deno_cmd() + .current_dir(util::testdata_path()) + .arg("serve") + .arg("--port") + .arg("0") + .stdout_piped(), + None, + ) + } +} + +impl ServeClient { + fn builder() -> ServeClientBuilder { + ServeClientBuilder::new() + } + fn with_child(child: DenoChild) -> Self { + Self { + child: RefCell::new(child), + output_buf: Default::default(), + endpoint: Default::default(), + client: reqwest::Client::builder() + .add_root_certificate( + reqwest::Certificate::from_pem(include_bytes!( + "../testdata/tls/RootCA.crt" + )) + .unwrap(), + ) + // disable connection pooling so we create a new connection per request + // which allows us to distribute requests across workers + .pool_max_idle_per_host(0) + .pool_idle_timeout(Duration::from_nanos(1)) + .http2_prior_knowledge() + .build() + .unwrap(), + } + } + + fn kill(self) { + let mut child = self.child.borrow_mut(); + child.kill().unwrap(); + child.wait().unwrap(); + } + + fn output(self) -> String { + let mut child = self.child.borrow_mut(); + child.kill().unwrap(); + let mut stdout = child.stdout.take().unwrap(); + child.wait().unwrap(); + + let mut output_buf = self.output_buf.borrow_mut(); + + stdout.read_to_end(&mut output_buf).unwrap(); + + String::from_utf8(std::mem::take(&mut *output_buf)).unwrap() + } + + fn get(&self) -> RequestBuilder { + let endpoint = self.endpoint(); + self.client.get(&*endpoint) + } + + fn endpoint(&self) -> String { + if let Some(e) = self.endpoint.borrow().as_ref() { + return e.to_string(); + }; + let mut buffer = self.output_buf.borrow_mut(); + let mut temp_buf = [0u8; 64]; + let mut child = self.child.borrow_mut(); + let stdout = child.stdout.as_mut().unwrap(); + let port_regex = regex::bytes::Regex::new(r":(\d+)").unwrap(); + + let start = std::time::Instant::now(); + // try to find the port number in the output + // it may not be the first line, so we need to read the output in a loop + let port = loop { + if start.elapsed() > Duration::from_secs(5) { + panic!( + "timed out waiting for serve to start. serve output:\n{}", + String::from_utf8_lossy(&buffer) + ); + } + let read = stdout.read(&mut temp_buf).unwrap(); + buffer.extend_from_slice(&temp_buf[..read]); + if let Some(p) = port_regex + .captures(&buffer) + .and_then(|c| c.get(1)) + .map(|v| std::str::from_utf8(v.as_bytes()).unwrap().to_owned()) + { + break p; + } + // this is technically blocking, but it's just a test and + // I don't want to switch RefCell to Mutex just for this + std::thread::sleep(Duration::from_millis(10)); + }; + self + .endpoint + .replace(Some(format!("http://127.0.0.1:{port}"))); + + return self.endpoint.borrow().clone().unwrap(); + } +} #[tokio::test] async fn deno_serve_port_0() { - let mut child = util::deno_cmd() - .current_dir(util::testdata_path()) - .arg("serve") - .arg("--port") - .arg("0") - .arg("./serve/port_0.ts") - .stdout_piped() - .spawn() - .unwrap(); - let stdout = child.stdout.as_mut().unwrap(); - let mut buffer = [0; 52]; - let _read = stdout.read(&mut buffer).unwrap(); - let msg = std::str::from_utf8(&buffer).unwrap(); - let port_regex = Regex::new(r":(\d+)").unwrap(); - let port = port_regex.captures(msg).unwrap().get(1).unwrap().as_str(); - - let cert = reqwest::Certificate::from_pem(include_bytes!( - "../testdata/tls/RootCA.crt" - )) - .unwrap(); - - let client = reqwest::Client::builder() - .add_root_certificate(cert) - .http2_prior_knowledge() - .build() - .unwrap(); - - let res = client - .get(&format!("http://127.0.0.1:{port}")) - .send() - .await - .unwrap(); + let client = ServeClient::builder() + .entry_point("./serve/port_0.ts") + .build(); + let res = client.get().send().await.unwrap(); assert_eq!(200, res.status()); let body = res.text().await.unwrap(); assert_eq!(body, "deno serve --port 0 works!"); - - child.kill().unwrap(); - child.wait().unwrap(); + client.kill(); } #[tokio::test] async fn deno_serve_no_args() { - let mut child = util::deno_cmd() - .current_dir(util::testdata_path()) - .arg("serve") - .arg("--port") - .arg("0") - .arg("./serve/no_args.ts") - .stdout_piped() - .spawn() - .unwrap(); - let stdout = child.stdout.as_mut().unwrap(); - let mut buffer = [0; 52]; - let _read = stdout.read(&mut buffer).unwrap(); - let msg = std::str::from_utf8(&buffer).unwrap(); - let port_regex = Regex::new(r":(\d+)").unwrap(); - let port = port_regex.captures(msg).unwrap().get(1).unwrap().as_str(); - - let cert = reqwest::Certificate::from_pem(include_bytes!( - "../testdata/tls/RootCA.crt" - )) - .unwrap(); - - let client = reqwest::Client::builder() - .add_root_certificate(cert) - .http2_prior_knowledge() - .build() - .unwrap(); - - let res = client - .get(&format!("http://127.0.0.1:{port}")) - .send() - .await - .unwrap(); + let client = ServeClient::builder() + .entry_point("./serve/no_args.ts") + .build(); + let res = client.get().send().await.unwrap(); assert_eq!(200, res.status()); let body = res.text().await.unwrap(); assert_eq!(body, "deno serve with no args in fetch() works!"); - - child.kill().unwrap(); - child.wait().unwrap(); +} + +#[tokio::test] +async fn deno_serve_parallel() { + let client = ServeClient::builder() + .entry_point("./serve/parallel.ts") + .worker_count(Some(4)) + .build(); + + let mut serve_counts = HashMap::::new(); + + tokio::time::sleep(Duration::from_millis(1000)).await; + + let serve_regex = + Regex::new(r"\[serve\-worker\-(\d+)\s*\] serving request").unwrap(); + + for _ in 0..100 { + let response = timeout(Duration::from_secs(2), client.get().send()) + .await + .unwrap() + .unwrap(); + assert_eq!(200, response.status()); + let body = response.text().await.unwrap(); + assert_eq!(body, "deno serve parallel"); + tokio::time::sleep(Duration::from_millis(1)).await; + } + + let output = client.output(); + + let listening_regex = + Regex::new(r"Listening on http[\w:/\.]+ with (\d+) threads").unwrap(); + + eprintln!("serve output:\n{output}"); + assert_eq!( + listening_regex + .captures(&output) + .unwrap() + .get(1) + .unwrap() + .as_str() + .trim(), + "4" + ); + + for capture in serve_regex.captures_iter(&output) { + if let Some(worker_number) = + capture.get(1).and_then(|m| m.as_str().parse::().ok()) + { + *serve_counts.entry(worker_number).or_default() += 1; + } + } + + assert!( + serve_counts.values().filter(|&&n| n > 2).count() >= 2, + "bad {serve_counts:?}" + ); } diff --git a/tests/testdata/serve/parallel.ts b/tests/testdata/serve/parallel.ts new file mode 100644 index 0000000000..f1f118c71e --- /dev/null +++ b/tests/testdata/serve/parallel.ts @@ -0,0 +1,7 @@ +console.log("starting serve"); +export default { + fetch(_req: Request) { + console.log("serving request"); + return new Response("deno serve parallel"); + }, +};