1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-22 15:06:54 -05:00

fix(ext/flash): graceful server startup/shutdown with unsettled promises in mind (#16616)

This PR resets the revert commit made by #16610, bringing back #16383
which attempts to fix the issue happening when we use the flash server
with `--watch` option enabled.
Also, some code changes are made to pass the regression test added in
#16610.
This commit is contained in:
Yusuke Tanaka 2022-11-25 02:38:09 +09:00 committed by GitHub
parent b6f49cf479
commit fd023cf793
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 305 additions and 148 deletions

View file

@ -1167,3 +1167,68 @@ fn run_watch_dynamic_imports() {
check_alive_then_kill(child); check_alive_then_kill(child);
} }
// https://github.com/denoland/deno/issues/16267
#[test]
fn run_watch_flash() {
let filename = "watch_flash.js";
let t = TempDir::new();
let file_to_watch = t.path().join(filename);
write(
&file_to_watch,
r#"
console.log("Starting flash server...");
Deno.serve({
onListen() {
console.error("First server is listening");
},
handler: () => {},
port: 4601,
});
"#,
)
.unwrap();
let mut child = util::deno_cmd()
.current_dir(t.path())
.arg("run")
.arg("--watch")
.arg("--unstable")
.arg("--allow-net")
.arg("-L")
.arg("debug")
.arg(&file_to_watch)
.env("NO_COLOR", "1")
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let (mut stdout_lines, mut stderr_lines) = child_lines(&mut child);
wait_contains("Starting flash server...", &mut stdout_lines);
wait_for(
|m| m.contains("Watching paths") && m.contains(filename),
&mut stderr_lines,
);
write(
&file_to_watch,
r#"
console.log("Restarting flash server...");
Deno.serve({
onListen() {
console.error("Second server is listening");
},
handler: () => {},
port: 4601,
});
"#,
)
.unwrap();
wait_contains("File change detected! Restarting!", &mut stderr_lines);
wait_contains("Restarting flash server...", &mut stdout_lines);
wait_contains("Second server is listening", &mut stderr_lines);
check_alive_then_kill(child);
}

View file

@ -57,32 +57,42 @@ Deno.test(async function httpServerCanResolveHostnames() {
await server; await server;
}); });
Deno.test(async function httpServerRejectsOnAddrInUse() { // TODO(magurotuna): ignore this case for now because it's flaky on GitHub Actions,
const ac = new AbortController(); // although it acts as expected when running locally.
// See https://github.com/denoland/deno/pull/16616
Deno.test({ ignore: true }, async function httpServerRejectsOnAddrInUse() {
const ac1 = new AbortController();
const listeningPromise = deferred(); const listeningPromise = deferred();
let port: number;
const server = Deno.serve({ const server = Deno.serve({
handler: (_req) => new Response("ok"), handler: (_req) => new Response("ok"),
hostname: "localhost", hostname: "localhost",
port: 4501, port: 0,
signal: ac.signal, signal: ac1.signal,
onListen: onListen(listeningPromise), onListen: (addr) => {
onError: createOnErrorCb(ac), port = addr.port;
listeningPromise.resolve();
},
onError: createOnErrorCb(ac1),
}); });
await listeningPromise;
const ac2 = new AbortController();
assertRejects( assertRejects(
() => () =>
Deno.serve({ Deno.serve({
handler: (_req) => new Response("ok"), handler: (_req) => new Response("ok"),
hostname: "localhost", hostname: "localhost",
port: 4501, port,
signal: ac.signal, signal: ac2.signal,
onListen: onListen(listeningPromise),
onError: createOnErrorCb(ac),
}), }),
Deno.errors.AddrInUse, Deno.errors.AddrInUse,
); );
ac.abort();
ac1.abort();
ac2.abort();
await server; await server;
}); });

View file

@ -188,8 +188,8 @@
return str; return str;
} }
function prepareFastCalls() { function prepareFastCalls(serverId) {
return core.ops.op_flash_make_request(); return core.ops.op_flash_make_request(serverId);
} }
function hostnameForDisplay(hostname) { function hostnameForDisplay(hostname) {
@ -495,15 +495,11 @@
const serverId = opFn(listenOpts); const serverId = opFn(listenOpts);
const serverPromise = core.opAsync("op_flash_drive_server", serverId); const serverPromise = core.opAsync("op_flash_drive_server", serverId);
const listenPromise = PromisePrototypeThen(
PromisePrototypeCatch(
PromisePrototypeThen(
core.opAsync("op_flash_wait_for_listening", serverId), core.opAsync("op_flash_wait_for_listening", serverId),
(port) => { (port) => {
onListen({ hostname: listenOpts.hostname, port }); onListen({ hostname: listenOpts.hostname, port });
}, },
),
() => {},
); );
const finishedPromise = PromisePrototypeCatch(serverPromise, () => {}); const finishedPromise = PromisePrototypeCatch(serverPromise, () => {});
@ -519,7 +515,7 @@
return; return;
} }
server.closed = true; server.closed = true;
await core.opAsync("op_flash_close_server", serverId); core.ops.op_flash_close_server(serverId);
await server.finished; await server.finished;
}, },
async serve() { async serve() {
@ -634,7 +630,7 @@
signal?.addEventListener("abort", () => { signal?.addEventListener("abort", () => {
clearInterval(dateInterval); clearInterval(dateInterval);
PromisePrototypeThen(server.close(), () => {}, () => {}); server.close();
}, { }, {
once: true, once: true,
}); });
@ -668,7 +664,7 @@
); );
} }
const fastOp = prepareFastCalls(); const fastOp = prepareFastCalls(serverId);
let nextRequestSync = () => fastOp.nextRequest(); let nextRequestSync = () => fastOp.nextRequest();
let getMethodSync = (token) => fastOp.getMethod(token); let getMethodSync = (token) => fastOp.getMethod(token);
let respondFast = (token, response, shutdown) => let respondFast = (token, response, shutdown) =>
@ -688,8 +684,8 @@
} }
await SafePromiseAll([ await SafePromiseAll([
listenPromise,
PromisePrototypeCatch(server.serve(), console.error), PromisePrototypeCatch(server.serve(), console.error),
serverPromise,
]); ]);
}; };
} }

View file

@ -35,6 +35,7 @@ use mio::Events;
use mio::Interest; use mio::Interest;
use mio::Poll; use mio::Poll;
use mio::Token; use mio::Token;
use mio::Waker;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use socket2::Socket; use socket2::Socket;
@ -47,6 +48,7 @@ use std::intrinsics::transmute;
use std::io::BufReader; use std::io::BufReader;
use std::io::Read; use std::io::Read;
use std::io::Write; use std::io::Write;
use std::marker::PhantomPinned;
use std::mem::replace; use std::mem::replace;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
@ -55,8 +57,8 @@ use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex; use std::sync::Mutex;
use std::task::Context; use std::task::Context;
use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
mod chunked; mod chunked;
@ -76,15 +78,24 @@ pub struct FlashContext {
pub servers: HashMap<u32, ServerContext>, pub servers: HashMap<u32, ServerContext>,
} }
impl Drop for FlashContext {
fn drop(&mut self) {
// Signal each server instance to shutdown.
for (_, server) in self.servers.drain() {
let _ = server.waker.wake();
}
}
}
pub struct ServerContext { pub struct ServerContext {
_addr: SocketAddr, _addr: SocketAddr,
tx: mpsc::Sender<Request>, tx: mpsc::Sender<Request>,
rx: mpsc::Receiver<Request>, rx: Option<mpsc::Receiver<Request>>,
requests: HashMap<u32, Request>, requests: HashMap<u32, Request>,
next_token: u32, next_token: u32,
listening_rx: Option<mpsc::Receiver<u16>>, listening_rx: Option<mpsc::Receiver<Result<u16, std::io::Error>>>,
close_tx: mpsc::Sender<()>,
cancel_handle: Rc<CancelHandle>, cancel_handle: Rc<CancelHandle>,
waker: Arc<Waker>,
} }
#[derive(Debug, Eq, PartialEq)] #[derive(Debug, Eq, PartialEq)]
@ -102,7 +113,10 @@ fn op_flash_respond(
shutdown: bool, shutdown: bool,
) -> u32 { ) -> u32 {
let flash_ctx = op_state.borrow_mut::<FlashContext>(); let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); let ctx = match flash_ctx.servers.get_mut(&server_id) {
Some(ctx) => ctx,
None => return 0,
};
flash_respond(ctx, token, shutdown, &response) flash_respond(ctx, token, shutdown, &response)
} }
@ -116,7 +130,7 @@ fn op_try_flash_respond_chuncked(
) -> u32 { ) -> u32 {
let flash_ctx = op_state.borrow_mut::<FlashContext>(); let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
let tx = ctx.requests.get(&token).unwrap(); let tx = ctx.requests.get_mut(&token).unwrap();
let sock = tx.socket(); let sock = tx.socket();
// TODO(@littledivy): Use writev when `UnixIoSlice` lands. // TODO(@littledivy): Use writev when `UnixIoSlice` lands.
@ -153,17 +167,20 @@ async fn op_flash_respond_async(
let sock = { let sock = {
let mut op_state = state.borrow_mut(); let mut op_state = state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>(); let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); let ctx = match flash_ctx.servers.get_mut(&server_id) {
Some(ctx) => ctx,
None => return Ok(()),
};
match shutdown { match shutdown {
true => { true => {
let tx = ctx.requests.remove(&token).unwrap(); let mut tx = ctx.requests.remove(&token).unwrap();
close = !tx.keep_alive; close = !tx.keep_alive;
tx.socket() tx.socket()
} }
// In case of a websocket upgrade or streaming response. // In case of a websocket upgrade or streaming response.
false => { false => {
let tx = ctx.requests.get(&token).unwrap(); let tx = ctx.requests.get_mut(&token).unwrap();
tx.socket() tx.socket()
} }
} }
@ -197,12 +214,12 @@ async fn op_flash_respond_chuncked(
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
let sock = match shutdown { let sock = match shutdown {
true => { true => {
let tx = ctx.requests.remove(&token).unwrap(); let mut tx = ctx.requests.remove(&token).unwrap();
tx.socket() tx.socket()
} }
// In case of a websocket upgrade or streaming response. // In case of a websocket upgrade or streaming response.
false => { false => {
let tx = ctx.requests.get(&token).unwrap(); let tx = ctx.requests.get_mut(&token).unwrap();
tx.socket() tx.socket()
} }
}; };
@ -344,7 +361,7 @@ fn flash_respond(
shutdown: bool, shutdown: bool,
response: &[u8], response: &[u8],
) -> u32 { ) -> u32 {
let tx = ctx.requests.get(&token).unwrap(); let tx = ctx.requests.get_mut(&token).unwrap();
let sock = tx.socket(); let sock = tx.socket();
sock.read_tx.take(); sock.read_tx.take();
@ -428,15 +445,36 @@ fn op_flash_method(state: &mut OpState, server_id: u32, token: u32) -> u32 {
} }
#[op] #[op]
async fn op_flash_close_server(state: Rc<RefCell<OpState>>, server_id: u32) { fn op_flash_drive_server(
let close_tx = { state: &mut OpState,
let mut op_state = state.borrow_mut(); server_id: u32,
let flash_ctx = op_state.borrow_mut::<FlashContext>(); ) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> {
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); let join_handle = {
ctx.cancel_handle.cancel(); let flash_ctx = state.borrow_mut::<FlashContext>();
ctx.close_tx.clone() flash_ctx
.join_handles
.remove(&server_id)
.ok_or_else(|| type_error("server not found"))?
}; };
let _ = close_tx.send(()).await; Ok(async move {
join_handle
.await
.map_err(|_| type_error("server join error"))??;
Ok(())
})
}
#[op]
fn op_flash_close_server(state: &mut OpState, server_id: u32) {
let flash_ctx = state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get(&server_id).unwrap();
// NOTE: We don't drop ServerContext associated with the given `server_id`,
// because it may still be in use by some unsettled promise after the flash
// thread is finished.
ctx.cancel_handle.cancel();
let _ = ctx.waker.wake();
} }
#[op] #[op]
@ -463,7 +501,7 @@ fn op_flash_path(
fn next_request_sync(ctx: &mut ServerContext) -> u32 { fn next_request_sync(ctx: &mut ServerContext) -> u32 {
let offset = ctx.next_token; let offset = ctx.next_token;
while let Ok(token) = ctx.rx.try_recv() { while let Ok(token) = ctx.rx.as_mut().unwrap().try_recv() {
ctx.requests.insert(ctx.next_token, token); ctx.requests.insert(ctx.next_token, token);
ctx.next_token += 1; ctx.next_token += 1;
} }
@ -526,6 +564,7 @@ unsafe fn op_flash_get_method_fast(
fn op_flash_make_request<'scope>( fn op_flash_make_request<'scope>(
scope: &mut v8::HandleScope<'scope>, scope: &mut v8::HandleScope<'scope>,
state: &mut OpState, state: &mut OpState,
server_id: u32,
) -> serde_v8::Value<'scope> { ) -> serde_v8::Value<'scope> {
let object_template = v8::ObjectTemplate::new(scope); let object_template = v8::ObjectTemplate::new(scope);
assert!(object_template assert!(object_template
@ -533,7 +572,7 @@ fn op_flash_make_request<'scope>(
let obj = object_template.new_instance(scope).unwrap(); let obj = object_template.new_instance(scope).unwrap();
let ctx = { let ctx = {
let flash_ctx = state.borrow_mut::<FlashContext>(); let flash_ctx = state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&0).unwrap(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
ctx as *mut ServerContext ctx as *mut ServerContext
}; };
obj.set_aligned_pointer_in_internal_field(V8_WRAPPER_OBJECT_INDEX, ctx as _); obj.set_aligned_pointer_in_internal_field(V8_WRAPPER_OBJECT_INDEX, ctx as _);
@ -625,7 +664,7 @@ fn op_flash_make_request<'scope>(
} }
#[inline] #[inline]
fn has_body_stream(req: &Request) -> bool { fn has_body_stream(req: &mut Request) -> bool {
let sock = req.socket(); let sock = req.socket();
sock.read_rx.is_some() sock.read_rx.is_some()
} }
@ -749,7 +788,10 @@ async fn op_flash_read_body(
{ {
let op_state = &mut state.borrow_mut(); let op_state = &mut state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>(); let flash_ctx = op_state.borrow_mut::<FlashContext>();
flash_ctx.servers.get_mut(&server_id).unwrap() as *mut ServerContext match flash_ctx.servers.get_mut(&server_id) {
Some(ctx) => ctx as *mut ServerContext,
None => return 0,
}
} }
.as_mut() .as_mut()
.unwrap() .unwrap()
@ -851,41 +893,40 @@ pub struct ListenOpts {
reuseport: bool, reuseport: bool,
} }
const SERVER_TOKEN: Token = Token(0);
// Token reserved for the thread close signal.
const WAKER_TOKEN: Token = Token(1);
#[allow(clippy::too_many_arguments)]
fn run_server( fn run_server(
tx: mpsc::Sender<Request>, tx: mpsc::Sender<Request>,
listening_tx: mpsc::Sender<u16>, listening_tx: mpsc::Sender<Result<u16, std::io::Error>>,
mut close_rx: mpsc::Receiver<()>,
addr: SocketAddr, addr: SocketAddr,
maybe_cert: Option<String>, maybe_cert: Option<String>,
maybe_key: Option<String>, maybe_key: Option<String>,
reuseport: bool, reuseport: bool,
mut poll: Poll,
// We put a waker as an unused argument here as it needs to be alive both in
// the flash thread and in the main thread (otherwise the notification would
// not be caught by the event loop on Linux).
// See the comment in mio's example:
// https://docs.rs/mio/0.8.4/x86_64-unknown-linux-gnu/mio/struct.Waker.html#examples
_waker: Arc<Waker>,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let domain = if addr.is_ipv4() { let mut listener = match listen(addr, reuseport) {
socket2::Domain::IPV4 Ok(listener) => listener,
} else { Err(e) => {
socket2::Domain::IPV6 listening_tx.blocking_send(Err(e)).unwrap();
}; return Err(generic_error(
let socket = Socket::new(domain, socket2::Type::STREAM, None)?; "failed to start listening on the specified address",
));
#[cfg(not(windows))]
socket.set_reuse_address(true)?;
if reuseport {
#[cfg(target_os = "linux")]
socket.set_reuse_port(true)?;
} }
};
let socket_addr = socket2::SockAddr::from(addr); // Register server.
socket.bind(&socket_addr)?;
socket.listen(128)?;
socket.set_nonblocking(true)?;
let std_listener: std::net::TcpListener = socket.into();
let mut listener = TcpListener::from_std(std_listener);
let mut poll = Poll::new()?;
let token = Token(0);
poll poll
.registry() .registry()
.register(&mut listener, token, Interest::READABLE) .register(&mut listener, SERVER_TOKEN, Interest::READABLE)
.unwrap(); .unwrap();
let tls_context: Option<Arc<rustls::ServerConfig>> = { let tls_context: Option<Arc<rustls::ServerConfig>> = {
@ -907,30 +948,25 @@ fn run_server(
}; };
listening_tx listening_tx
.blocking_send(listener.local_addr().unwrap().port()) .blocking_send(Ok(listener.local_addr().unwrap().port()))
.unwrap(); .unwrap();
let mut sockets = HashMap::with_capacity(1000); let mut sockets = HashMap::with_capacity(1000);
let mut counter: usize = 1; let mut socket_senders = HashMap::with_capacity(1000);
let mut counter: usize = 2;
let mut events = Events::with_capacity(1024); let mut events = Events::with_capacity(1024);
'outer: loop { 'outer: loop {
let result = close_rx.try_recv(); match poll.poll(&mut events, None) {
if result.is_ok() {
break 'outer;
}
// FIXME(bartlomieju): how does Tokio handle it? I just put random 100ms
// timeout here to handle close signal.
match poll.poll(&mut events, Some(Duration::from_millis(100))) {
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => panic!("{}", e), Err(e) => panic!("{}", e),
Ok(()) => (), Ok(()) => (),
} }
'events: for event in &events { 'events: for event in &events {
if close_rx.try_recv().is_ok() {
break 'outer;
}
let token = event.token(); let token = event.token();
match token { match token {
Token(0) => loop { WAKER_TOKEN => {
break 'outer;
}
SERVER_TOKEN => loop {
match listener.accept() { match listener.accept() {
Ok((mut socket, _)) => { Ok((mut socket, _)) => {
counter += 1; counter += 1;
@ -958,6 +994,7 @@ fn run_server(
read_lock: Arc::new(Mutex::new(())), read_lock: Arc::new(Mutex::new(())),
parse_done: ParseStatus::None, parse_done: ParseStatus::None,
buffer: UnsafeCell::new(vec![0_u8; 1024]), buffer: UnsafeCell::new(vec![0_u8; 1024]),
_pinned: PhantomPinned,
}); });
trace!("New connection: {}", token.0); trace!("New connection: {}", token.0);
@ -974,7 +1011,6 @@ fn run_server(
let mut_ref: Pin<&mut Stream> = Pin::as_mut(socket); let mut_ref: Pin<&mut Stream> = Pin::as_mut(socket);
Pin::get_unchecked_mut(mut_ref) Pin::get_unchecked_mut(mut_ref)
}; };
let sock_ptr = socket as *mut _;
if socket.detached { if socket.detached {
match &mut socket.inner { match &mut socket.inner {
@ -988,6 +1024,7 @@ fn run_server(
let boxed = sockets.remove(&token).unwrap(); let boxed = sockets.remove(&token).unwrap();
std::mem::forget(boxed); std::mem::forget(boxed);
socket_senders.remove(&token);
trace!("Socket detached: {}", token.0); trace!("Socket detached: {}", token.0);
continue; continue;
} }
@ -1173,8 +1210,10 @@ fn run_server(
continue 'events; continue 'events;
} }
let (socket_tx, socket_rx) = oneshot::channel();
tx.blocking_send(Request { tx.blocking_send(Request {
socket: sock_ptr, socket: socket as *mut _,
// SAFETY: headers backing buffer outlives the mio event loop ('static) // SAFETY: headers backing buffer outlives the mio event loop ('static)
inner: inner_req, inner: inner_req,
keep_alive, keep_alive,
@ -1183,16 +1222,57 @@ fn run_server(
content_read: 0, content_read: 0,
content_length, content_length,
expect_continue, expect_continue,
socket_rx,
owned_socket: None,
}) })
.ok(); .ok();
socket_senders.insert(token, socket_tx);
} }
} }
} }
} }
// Now the flash thread is about to finish, but there may be some unsettled
// promises in the main thread that will use the socket. To make the socket
// alive longer enough, we move its ownership to the main thread.
for (tok, socket) in sockets {
if let Some(sender) = socket_senders.remove(&tok) {
// Do nothing if the receiver has already been dropped.
_ = sender.send(socket);
}
}
Ok(()) Ok(())
} }
#[inline]
fn listen(
addr: SocketAddr,
reuseport: bool,
) -> Result<TcpListener, std::io::Error> {
let domain = if addr.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let socket = Socket::new(domain, socket2::Type::STREAM, None)?;
#[cfg(not(windows))]
socket.set_reuse_address(true)?;
if reuseport {
#[cfg(target_os = "linux")]
socket.set_reuse_port(true)?;
}
let socket_addr = socket2::SockAddr::from(addr);
socket.bind(&socket_addr)?;
socket.listen(128)?;
socket.set_nonblocking(true)?;
let std_listener: std::net::TcpListener = socket.into();
Ok(TcpListener::from_std(std_listener))
}
fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) { fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) {
// Default to localhost if given just the port. Example: ":80" // Default to localhost if given just the port. Example: ":80"
if hostname.is_empty() { if hostname.is_empty() {
@ -1230,17 +1310,19 @@ where
.next() .next()
.ok_or_else(|| generic_error("No resolved address found"))?; .ok_or_else(|| generic_error("No resolved address found"))?;
let (tx, rx) = mpsc::channel(100); let (tx, rx) = mpsc::channel(100);
let (close_tx, close_rx) = mpsc::channel(1);
let (listening_tx, listening_rx) = mpsc::channel(1); let (listening_tx, listening_rx) = mpsc::channel(1);
let poll = Poll::new()?;
let waker = Arc::new(Waker::new(poll.registry(), WAKER_TOKEN).unwrap());
let ctx = ServerContext { let ctx = ServerContext {
_addr: addr, _addr: addr,
tx, tx,
rx, rx: Some(rx),
requests: HashMap::with_capacity(1000), requests: HashMap::with_capacity(1000),
next_token: 0, next_token: 0,
close_tx,
listening_rx: Some(listening_rx), listening_rx: Some(listening_rx),
cancel_handle: CancelHandle::new_rc(), cancel_handle: CancelHandle::new_rc(),
waker: waker.clone(),
}; };
let tx = ctx.tx.clone(); let tx = ctx.tx.clone();
let maybe_cert = opts.cert; let maybe_cert = opts.cert;
@ -1250,11 +1332,12 @@ where
run_server( run_server(
tx, tx,
listening_tx, listening_tx,
close_rx,
addr, addr,
maybe_cert, maybe_cert,
maybe_key, maybe_key,
reuseport, reuseport,
poll,
waker,
) )
}); });
let flash_ctx = state.borrow_mut::<FlashContext>(); let flash_ctx = state.borrow_mut::<FlashContext>();
@ -1289,45 +1372,26 @@ where
} }
#[op] #[op]
fn op_flash_wait_for_listening( async fn op_flash_wait_for_listening(
state: &mut OpState, state: Rc<RefCell<OpState>>,
server_id: u32, server_id: u32,
) -> Result<impl Future<Output = Result<u16, AnyError>> + 'static, AnyError> { ) -> Result<u16, AnyError> {
let mut listening_rx = { let mut listening_rx = {
let flash_ctx = state.borrow_mut::<FlashContext>(); let mut op_state = state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
let server_ctx = flash_ctx let server_ctx = flash_ctx
.servers .servers
.get_mut(&server_id) .get_mut(&server_id)
.ok_or_else(|| type_error("server not found"))?; .ok_or_else(|| type_error("server not found"))?;
server_ctx.listening_rx.take().unwrap() server_ctx.listening_rx.take().unwrap()
}; };
Ok(async move { match listening_rx.recv().await {
if let Some(port) = listening_rx.recv().await { Some(Ok(port)) => Ok(port),
Ok(port) Some(Err(e)) => Err(e.into()),
} else { _ => Err(generic_error(
Err(generic_error("This error will be discarded")) "unknown error occurred while waiting for listening",
)),
} }
})
}
#[op]
fn op_flash_drive_server(
state: &mut OpState,
server_id: u32,
) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> {
let join_handle = {
let flash_ctx = state.borrow_mut::<FlashContext>();
flash_ctx
.join_handles
.remove(&server_id)
.ok_or_else(|| type_error("server not found"))?
};
Ok(async move {
join_handle
.await
.map_err(|_| type_error("server join error"))??;
Ok(())
})
} }
// Asychronous version of op_flash_next. This can be a bottleneck under // Asychronous version of op_flash_next. This can be a bottleneck under
@ -1335,26 +1399,34 @@ fn op_flash_drive_server(
// requests i.e `op_flash_next() == 0`. // requests i.e `op_flash_next() == 0`.
#[op] #[op]
async fn op_flash_next_async( async fn op_flash_next_async(
op_state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
server_id: u32, server_id: u32,
) -> u32 { ) -> u32 {
let ctx = { let mut op_state = state.borrow_mut();
let mut op_state = op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>(); let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
ctx as *mut ServerContext let cancel_handle = ctx.cancel_handle.clone();
}; let mut rx = ctx.rx.take().unwrap();
// SAFETY: we cannot hold op_state borrow across the await point. The JS caller // We need to drop the borrow before await point.
// is responsible for ensuring this is not called concurrently. drop(op_state);
let ctx = unsafe { &mut *ctx };
let cancel_handle = &ctx.cancel_handle;
if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await { if let Ok(Some(req)) = rx.recv().or_cancel(&cancel_handle).await {
let mut op_state = state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
ctx.requests.insert(ctx.next_token, req); ctx.requests.insert(ctx.next_token, req);
ctx.next_token += 1; ctx.next_token += 1;
// Set the rx back.
ctx.rx = Some(rx);
return 1; return 1;
} }
// Set the rx back.
let mut op_state = state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
if let Some(ctx) = flash_ctx.servers.get_mut(&server_id) {
ctx.rx = Some(rx);
}
0 0
} }
@ -1427,7 +1499,7 @@ pub fn detach_socket(
// dropped on the server thread. // dropped on the server thread.
// * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we // * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we
// use raw fds. // use raw fds.
let tx = ctx let mut tx = ctx
.requests .requests
.remove(&token) .remove(&token)
.ok_or_else(|| type_error("request closed"))?; .ok_or_else(|| type_error("request closed"))?;
@ -1522,11 +1594,11 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
op_flash_next_async::decl(), op_flash_next_async::decl(),
op_flash_read_body::decl(), op_flash_read_body::decl(),
op_flash_upgrade_websocket::decl(), op_flash_upgrade_websocket::decl(),
op_flash_drive_server::decl(),
op_flash_wait_for_listening::decl(), op_flash_wait_for_listening::decl(),
op_flash_first_packet::decl(), op_flash_first_packet::decl(),
op_flash_has_body_stream::decl(), op_flash_has_body_stream::decl(),
op_flash_close_server::decl(), op_flash_close_server::decl(),
op_flash_drive_server::decl(),
op_flash_make_request::decl(), op_flash_make_request::decl(),
op_flash_write_resource::decl(), op_flash_write_resource::decl(),
op_try_flash_respond_chuncked::decl(), op_try_flash_respond_chuncked::decl(),

View file

@ -2,6 +2,7 @@
use crate::Stream; use crate::Stream;
use std::pin::Pin; use std::pin::Pin;
use tokio::sync::oneshot;
#[derive(Debug)] #[derive(Debug)]
pub struct InnerRequest { pub struct InnerRequest {
@ -20,8 +21,7 @@ pub struct Request {
pub inner: InnerRequest, pub inner: InnerRequest,
// Pointer to stream owned by the server loop thread. // Pointer to stream owned by the server loop thread.
// //
// Dereferencing is safe until server thread finishes and // Dereferencing is safe until websocket upgrade is performed.
// op_flash_serve resolves or websocket upgrade is performed.
pub socket: *mut Stream, pub socket: *mut Stream,
pub keep_alive: bool, pub keep_alive: bool,
pub content_read: usize, pub content_read: usize,
@ -29,6 +29,8 @@ pub struct Request {
pub remaining_chunk_size: Option<usize>, pub remaining_chunk_size: Option<usize>,
pub te_chunked: bool, pub te_chunked: bool,
pub expect_continue: bool, pub expect_continue: bool,
pub socket_rx: oneshot::Receiver<Pin<Box<Stream>>>,
pub owned_socket: Option<Pin<Box<Stream>>>,
} }
// SAFETY: Sent from server thread to JS thread. // SAFETY: Sent from server thread to JS thread.
@ -37,8 +39,16 @@ unsafe impl Send for Request {}
impl Request { impl Request {
#[inline(always)] #[inline(always)]
pub fn socket<'a>(&self) -> &'a mut Stream { pub fn socket<'a>(&mut self) -> &'a mut Stream {
// SAFETY: Dereferencing is safe until server thread detaches socket or finishes. if let Ok(mut sock) = self.socket_rx.try_recv() {
// SAFETY: We never move the data out of the acquired mutable reference.
self.socket = unsafe { sock.as_mut().get_unchecked_mut() };
// Let the struct own the socket so that it won't get dropped.
self.owned_socket = Some(sock);
}
// SAFETY: Dereferencing is safe until server thread detaches socket.
unsafe { &mut *self.socket } unsafe { &mut *self.socket }
} }

View file

@ -1,23 +1,26 @@
use deno_core::error::AnyError; use deno_core::error::AnyError;
use mio::net::TcpStream; use mio::net::TcpStream;
use std::{ use std::cell::UnsafeCell;
cell::UnsafeCell, use std::future::Future;
future::Future, use std::io::Read;
io::{Read, Write}, use std::io::Write;
pin::Pin, use std::marker::PhantomPinned;
sync::{Arc, Mutex}, use std::pin::Pin;
}; use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::ParseStatus; use crate::ParseStatus;
type TlsTcpStream = rustls::StreamOwned<rustls::ServerConnection, TcpStream>; type TlsTcpStream = rustls::StreamOwned<rustls::ServerConnection, TcpStream>;
#[derive(Debug)]
pub enum InnerStream { pub enum InnerStream {
Tcp(TcpStream), Tcp(TcpStream),
Tls(Box<TlsTcpStream>), Tls(Box<TlsTcpStream>),
} }
#[derive(Debug)]
pub struct Stream { pub struct Stream {
pub inner: InnerStream, pub inner: InnerStream,
pub detached: bool, pub detached: bool,
@ -26,6 +29,7 @@ pub struct Stream {
pub parse_done: ParseStatus, pub parse_done: ParseStatus,
pub buffer: UnsafeCell<Vec<u8>>, pub buffer: UnsafeCell<Vec<u8>>,
pub read_lock: Arc<Mutex<()>>, pub read_lock: Arc<Mutex<()>>,
pub _pinned: PhantomPinned,
} }
impl Stream { impl Stream {