diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs index fd60e85b70..2671069135 100644 --- a/cli/tests/integration/run_tests.rs +++ b/cli/tests/integration/run_tests.rs @@ -3642,3 +3642,9 @@ itest!(no_lock_flag { http_server: true, exit_code: 0, }); + +// Check https://github.com/denoland/deno_std/issues/2882 +itest!(flash_shutdown { + args: "run --unstable --allow-net run/flash_shutdown/main.ts", + exit_code: 0, +}); diff --git a/cli/tests/integration/watcher_tests.rs b/cli/tests/integration/watcher_tests.rs index 6e3319b11f..58f7e11fa1 100644 --- a/cli/tests/integration/watcher_tests.rs +++ b/cli/tests/integration/watcher_tests.rs @@ -1167,68 +1167,3 @@ fn run_watch_dynamic_imports() { check_alive_then_kill(child); } - -// https://github.com/denoland/deno/issues/16267 -#[test] -fn run_watch_flash() { - let filename = "watch_flash.js"; - let t = TempDir::new(); - let file_to_watch = t.path().join(filename); - write( - &file_to_watch, - r#" - console.log("Starting flash server..."); - Deno.serve({ - onListen() { - console.error("First server is listening"); - }, - handler: () => {}, - port: 4601, - }); - "#, - ) - .unwrap(); - - let mut child = util::deno_cmd() - .current_dir(t.path()) - .arg("run") - .arg("--watch") - .arg("--unstable") - .arg("--allow-net") - .arg("-L") - .arg("debug") - .arg(&file_to_watch) - .env("NO_COLOR", "1") - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn() - .unwrap(); - let (mut stdout_lines, mut stderr_lines) = child_lines(&mut child); - - wait_contains("Starting flash server...", &mut stdout_lines); - wait_for( - |m| m.contains("Watching paths") && m.contains(filename), - &mut stderr_lines, - ); - - write( - &file_to_watch, - r#" - console.log("Restarting flash server..."); - Deno.serve({ - onListen() { - console.error("Second server is listening"); - }, - handler: () => {}, - port: 4601, - }); - "#, - ) - .unwrap(); - - wait_contains("File change detected! Restarting!", &mut stderr_lines); - wait_contains("Restarting flash server...", &mut stdout_lines); - wait_contains("Second server is listening", &mut stderr_lines); - - check_alive_then_kill(child); -} diff --git a/cli/tests/testdata/run/flash_shutdown/main.ts b/cli/tests/testdata/run/flash_shutdown/main.ts new file mode 100644 index 0000000000..7f6985e347 --- /dev/null +++ b/cli/tests/testdata/run/flash_shutdown/main.ts @@ -0,0 +1,23 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +// Deno.serve caused segfault with this example after #16383 +// refs: +// - https://github.com/denoland/deno/pull/16383 +// - https://github.com/denoland/deno_std/issues/2882 +// - revert https://github.com/denoland/deno/pull/16610 + +const ctl = new AbortController(); +Deno.serve(() => + new Promise((resolve) => { + resolve(new Response(new TextEncoder().encode("ok"))); + ctl.abort(); + }), { + signal: ctl.signal, + async onListen({ port }) { + const a = await fetch(`http://localhost:${port}`, { + method: "POST", + body: "", + }); + await a.text(); + }, +}); diff --git a/cli/tests/unit/flash_test.ts b/cli/tests/unit/flash_test.ts index c64a7fe5a9..0240694553 100644 --- a/cli/tests/unit/flash_test.ts +++ b/cli/tests/unit/flash_test.ts @@ -70,7 +70,6 @@ Deno.test(async function httpServerRejectsOnAddrInUse() { onError: createOnErrorCb(ac), }); - await listeningPromise; assertRejects( () => Deno.serve({ diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js index 8eed6047e5..7a6b9bc47e 100644 --- a/ext/flash/01_http.js +++ b/ext/flash/01_http.js @@ -188,8 +188,8 @@ return str; } - function prepareFastCalls(serverId) { - return core.ops.op_flash_make_request(serverId); + function prepareFastCalls() { + return core.ops.op_flash_make_request(); } function hostnameForDisplay(hostname) { @@ -482,11 +482,15 @@ const serverId = opFn(listenOpts); const serverPromise = core.opAsync("op_flash_drive_server", serverId); - const listenPromise = PromisePrototypeThen( - core.opAsync("op_flash_wait_for_listening", serverId), - (port) => { - onListen({ hostname: listenOpts.hostname, port }); - }, + + PromisePrototypeCatch( + PromisePrototypeThen( + core.opAsync("op_flash_wait_for_listening", serverId), + (port) => { + onListen({ hostname: listenOpts.hostname, port }); + }, + ), + () => {}, ); const finishedPromise = PromisePrototypeCatch(serverPromise, () => {}); @@ -502,7 +506,7 @@ return; } server.closed = true; - core.ops.op_flash_close_server(serverId); + await core.opAsync("op_flash_close_server", serverId); await server.finished; }, async serve() { @@ -614,7 +618,7 @@ signal?.addEventListener("abort", () => { clearInterval(dateInterval); - server.close(); + PromisePrototypeThen(server.close(), () => {}, () => {}); }, { once: true, }); @@ -629,7 +633,7 @@ ); } - const fastOp = prepareFastCalls(serverId); + const fastOp = prepareFastCalls(); let nextRequestSync = () => fastOp.nextRequest(); let getMethodSync = (token) => fastOp.getMethod(token); let respondFast = (token, response, shutdown) => @@ -649,8 +653,8 @@ } await SafePromiseAll([ - listenPromise, PromisePrototypeCatch(server.serve(), console.error), + serverPromise, ]); }; } diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs index 41d88b3a02..b077b8d219 100644 --- a/ext/flash/lib.rs +++ b/ext/flash/lib.rs @@ -35,7 +35,6 @@ use mio::Events; use mio::Interest; use mio::Poll; use mio::Token; -use mio::Waker; use serde::Deserialize; use serde::Serialize; use socket2::Socket; @@ -56,6 +55,7 @@ use std::rc::Rc; use std::sync::Arc; use std::sync::Mutex; use std::task::Context; +use std::time::Duration; use tokio::sync::mpsc; use tokio::task::JoinHandle; @@ -76,24 +76,15 @@ pub struct FlashContext { pub servers: HashMap, } -impl Drop for FlashContext { - fn drop(&mut self) { - // Signal each server instance to shutdown. - for (_, server) in self.servers.drain() { - let _ = server.waker.wake(); - } - } -} - pub struct ServerContext { _addr: SocketAddr, tx: mpsc::Sender, - rx: Option>, + rx: mpsc::Receiver, requests: HashMap, next_token: u32, - listening_rx: Option>>, + listening_rx: Option>, + close_tx: mpsc::Sender<()>, cancel_handle: Rc, - waker: Arc, } #[derive(Debug, Eq, PartialEq)] @@ -111,10 +102,7 @@ fn op_flash_respond( shutdown: bool, ) -> u32 { let flash_ctx = op_state.borrow_mut::(); - let ctx = match flash_ctx.servers.get_mut(&server_id) { - Some(ctx) => ctx, - None => return 0, - }; + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); flash_respond(ctx, token, shutdown, &response) } @@ -132,10 +120,7 @@ async fn op_flash_respond_async( let sock = { let mut op_state = state.borrow_mut(); let flash_ctx = op_state.borrow_mut::(); - let ctx = match flash_ctx.servers.get_mut(&server_id) { - Some(ctx) => ctx, - None => return Ok(()), - }; + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); match shutdown { true => { @@ -399,30 +384,15 @@ fn op_flash_method(state: &mut OpState, server_id: u32, token: u32) -> u32 { } #[op] -fn op_flash_drive_server( - state: &mut OpState, - server_id: u32, -) -> Result> + 'static, AnyError> { - let join_handle = { - let flash_ctx = state.borrow_mut::(); - flash_ctx - .join_handles - .remove(&server_id) - .ok_or_else(|| type_error("server not found"))? +async fn op_flash_close_server(state: Rc>, server_id: u32) { + let close_tx = { + let mut op_state = state.borrow_mut(); + let flash_ctx = op_state.borrow_mut::(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + ctx.cancel_handle.cancel(); + ctx.close_tx.clone() }; - Ok(async move { - join_handle - .await - .map_err(|_| type_error("server join error"))??; - Ok(()) - }) -} - -#[op] -fn op_flash_close_server(state: &mut OpState, server_id: u32) { - let flash_ctx = state.borrow_mut::(); - let ctx = flash_ctx.servers.remove(&server_id).unwrap(); - let _ = ctx.waker.wake(); + let _ = close_tx.send(()).await; } #[op] @@ -449,7 +419,7 @@ fn op_flash_path( fn next_request_sync(ctx: &mut ServerContext) -> u32 { let offset = ctx.next_token; - while let Ok(token) = ctx.rx.as_mut().unwrap().try_recv() { + while let Ok(token) = ctx.rx.try_recv() { ctx.requests.insert(ctx.next_token, token); ctx.next_token += 1; } @@ -512,7 +482,6 @@ unsafe fn op_flash_get_method_fast( fn op_flash_make_request<'scope>( scope: &mut v8::HandleScope<'scope>, state: &mut OpState, - server_id: u32, ) -> serde_v8::Value<'scope> { let object_template = v8::ObjectTemplate::new(scope); assert!(object_template @@ -520,7 +489,7 @@ fn op_flash_make_request<'scope>( let obj = object_template.new_instance(scope).unwrap(); let ctx = { let flash_ctx = state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + let ctx = flash_ctx.servers.get_mut(&0).unwrap(); ctx as *mut ServerContext }; obj.set_aligned_pointer_in_internal_field(V8_WRAPPER_OBJECT_INDEX, ctx as _); @@ -736,10 +705,7 @@ async fn op_flash_read_body( { let op_state = &mut state.borrow_mut(); let flash_ctx = op_state.borrow_mut::(); - match flash_ctx.servers.get_mut(&server_id) { - Some(ctx) => ctx as *mut ServerContext, - None => return 0, - } + flash_ctx.servers.get_mut(&server_id).unwrap() as *mut ServerContext } .as_mut() .unwrap() @@ -841,40 +807,41 @@ pub struct ListenOpts { reuseport: bool, } -const SERVER_TOKEN: Token = Token(0); -// Token reserved for the thread close signal. -const WAKER_TOKEN: Token = Token(1); - -#[allow(clippy::too_many_arguments)] fn run_server( tx: mpsc::Sender, - listening_tx: mpsc::Sender>, + listening_tx: mpsc::Sender, + mut close_rx: mpsc::Receiver<()>, addr: SocketAddr, maybe_cert: Option, maybe_key: Option, reuseport: bool, - mut poll: Poll, - // We put a waker as an unused argument here as it needs to be alive both in - // the flash thread and in the main thread (otherwise the notification would - // not be caught by the event loop on Linux). - // See the comment in mio's example: - // https://docs.rs/mio/0.8.4/x86_64-unknown-linux-gnu/mio/struct.Waker.html#examples - _waker: Arc, ) -> Result<(), AnyError> { - let mut listener = match listen(addr, reuseport) { - Ok(listener) => listener, - Err(e) => { - listening_tx.blocking_send(Err(e)).unwrap(); - return Err(generic_error( - "failed to start listening on the specified address", - )); - } + let domain = if addr.is_ipv4() { + socket2::Domain::IPV4 + } else { + socket2::Domain::IPV6 }; + let socket = Socket::new(domain, socket2::Type::STREAM, None)?; - // Register server. + #[cfg(not(windows))] + socket.set_reuse_address(true)?; + if reuseport { + #[cfg(target_os = "linux")] + socket.set_reuse_port(true)?; + } + + let socket_addr = socket2::SockAddr::from(addr); + socket.bind(&socket_addr)?; + socket.listen(128)?; + socket.set_nonblocking(true)?; + let std_listener: std::net::TcpListener = socket.into(); + let mut listener = TcpListener::from_std(std_listener); + + let mut poll = Poll::new()?; + let token = Token(0); poll .registry() - .register(&mut listener, SERVER_TOKEN, Interest::READABLE) + .register(&mut listener, token, Interest::READABLE) .unwrap(); let tls_context: Option> = { @@ -896,24 +863,30 @@ fn run_server( }; listening_tx - .blocking_send(Ok(listener.local_addr().unwrap().port())) + .blocking_send(listener.local_addr().unwrap().port()) .unwrap(); let mut sockets = HashMap::with_capacity(1000); - let mut counter: usize = 2; + let mut counter: usize = 1; let mut events = Events::with_capacity(1024); 'outer: loop { - match poll.poll(&mut events, None) { + let result = close_rx.try_recv(); + if result.is_ok() { + break 'outer; + } + // FIXME(bartlomieju): how does Tokio handle it? I just put random 100ms + // timeout here to handle close signal. + match poll.poll(&mut events, Some(Duration::from_millis(100))) { Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, Err(e) => panic!("{}", e), Ok(()) => (), } 'events: for event in &events { + if close_rx.try_recv().is_ok() { + break 'outer; + } let token = event.token(); match token { - WAKER_TOKEN => { - break 'outer; - } - SERVER_TOKEN => loop { + Token(0) => loop { match listener.accept() { Ok((mut socket, _)) => { counter += 1; @@ -1176,33 +1149,6 @@ fn run_server( Ok(()) } -#[inline] -fn listen( - addr: SocketAddr, - reuseport: bool, -) -> Result { - let domain = if addr.is_ipv4() { - socket2::Domain::IPV4 - } else { - socket2::Domain::IPV6 - }; - let socket = Socket::new(domain, socket2::Type::STREAM, None)?; - - #[cfg(not(windows))] - socket.set_reuse_address(true)?; - if reuseport { - #[cfg(target_os = "linux")] - socket.set_reuse_port(true)?; - } - - let socket_addr = socket2::SockAddr::from(addr); - socket.bind(&socket_addr)?; - socket.listen(128)?; - socket.set_nonblocking(true)?; - let std_listener: std::net::TcpListener = socket.into(); - Ok(TcpListener::from_std(std_listener)) -} - fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) { // Default to localhost if given just the port. Example: ":80" if hostname.is_empty() { @@ -1240,19 +1186,17 @@ where .next() .ok_or_else(|| generic_error("No resolved address found"))?; let (tx, rx) = mpsc::channel(100); + let (close_tx, close_rx) = mpsc::channel(1); let (listening_tx, listening_rx) = mpsc::channel(1); - - let poll = Poll::new()?; - let waker = Arc::new(Waker::new(poll.registry(), WAKER_TOKEN).unwrap()); let ctx = ServerContext { _addr: addr, tx, - rx: Some(rx), + rx, requests: HashMap::with_capacity(1000), next_token: 0, + close_tx, listening_rx: Some(listening_rx), cancel_handle: CancelHandle::new_rc(), - waker: waker.clone(), }; let tx = ctx.tx.clone(); let maybe_cert = opts.cert; @@ -1262,12 +1206,11 @@ where run_server( tx, listening_tx, + close_rx, addr, maybe_cert, maybe_key, reuseport, - poll, - waker, ) }); let flash_ctx = state.borrow_mut::(); @@ -1302,26 +1245,45 @@ where } #[op] -async fn op_flash_wait_for_listening( - state: Rc>, +fn op_flash_wait_for_listening( + state: &mut OpState, server_id: u32, -) -> Result { +) -> Result> + 'static, AnyError> { let mut listening_rx = { - let mut op_state = state.borrow_mut(); - let flash_ctx = op_state.borrow_mut::(); + let flash_ctx = state.borrow_mut::(); let server_ctx = flash_ctx .servers .get_mut(&server_id) .ok_or_else(|| type_error("server not found"))?; server_ctx.listening_rx.take().unwrap() }; - match listening_rx.recv().await { - Some(Ok(port)) => Ok(port), - Some(Err(e)) => Err(e.into()), - _ => Err(generic_error( - "unknown error occurred while waiting for listening", - )), - } + Ok(async move { + if let Some(port) = listening_rx.recv().await { + Ok(port) + } else { + Err(generic_error("This error will be discarded")) + } + }) +} + +#[op] +fn op_flash_drive_server( + state: &mut OpState, + server_id: u32, +) -> Result> + 'static, AnyError> { + let join_handle = { + let flash_ctx = state.borrow_mut::(); + flash_ctx + .join_handles + .remove(&server_id) + .ok_or_else(|| type_error("server not found"))? + }; + Ok(async move { + join_handle + .await + .map_err(|_| type_error("server join error"))??; + Ok(()) + }) } // Asychronous version of op_flash_next. This can be a bottleneck under @@ -1329,34 +1291,26 @@ async fn op_flash_wait_for_listening( // requests i.e `op_flash_next() == 0`. #[op] async fn op_flash_next_async( - state: Rc>, + op_state: Rc>, server_id: u32, ) -> u32 { - let mut op_state = state.borrow_mut(); - let flash_ctx = op_state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - let cancel_handle = ctx.cancel_handle.clone(); - let mut rx = ctx.rx.take().unwrap(); - // We need to drop the borrow before await point. - drop(op_state); - - if let Ok(Some(req)) = rx.recv().or_cancel(&cancel_handle).await { - let mut op_state = state.borrow_mut(); + let ctx = { + let mut op_state = op_state.borrow_mut(); let flash_ctx = op_state.borrow_mut::(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + ctx as *mut ServerContext + }; + // SAFETY: we cannot hold op_state borrow across the await point. The JS caller + // is responsible for ensuring this is not called concurrently. + let ctx = unsafe { &mut *ctx }; + let cancel_handle = &ctx.cancel_handle; + + if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await { ctx.requests.insert(ctx.next_token, req); ctx.next_token += 1; - // Set the rx back. - ctx.rx = Some(rx); return 1; } - // Set the rx back. - let mut op_state = state.borrow_mut(); - let flash_ctx = op_state.borrow_mut::(); - if let Some(ctx) = flash_ctx.servers.get_mut(&server_id) { - ctx.rx = Some(rx); - } 0 } @@ -1524,11 +1478,11 @@ pub fn init(unstable: bool) -> Extension { op_flash_next_async::decl(), op_flash_read_body::decl(), op_flash_upgrade_websocket::decl(), + op_flash_drive_server::decl(), op_flash_wait_for_listening::decl(), op_flash_first_packet::decl(), op_flash_has_body_stream::decl(), op_flash_close_server::decl(), - op_flash_drive_server::decl(), op_flash_make_request::decl(), op_flash_write_resource::decl(), ])