mirror of
https://github.com/denoland/deno.git
synced 2024-12-22 23:34:47 -05:00
#16383 made some of Node compat test cases flaky in deno_std (and when it fails it causes segfaults). See https://github.com/denoland/deno_std/issues/2882 for details
This commit is contained in:
parent
88643aa478
commit
336e96a114
6 changed files with 145 additions and 224 deletions
|
@ -3642,3 +3642,9 @@ itest!(no_lock_flag {
|
|||
http_server: true,
|
||||
exit_code: 0,
|
||||
});
|
||||
|
||||
// Check https://github.com/denoland/deno_std/issues/2882
|
||||
itest!(flash_shutdown {
|
||||
args: "run --unstable --allow-net run/flash_shutdown/main.ts",
|
||||
exit_code: 0,
|
||||
});
|
||||
|
|
|
@ -1167,68 +1167,3 @@ fn run_watch_dynamic_imports() {
|
|||
|
||||
check_alive_then_kill(child);
|
||||
}
|
||||
|
||||
// https://github.com/denoland/deno/issues/16267
|
||||
#[test]
|
||||
fn run_watch_flash() {
|
||||
let filename = "watch_flash.js";
|
||||
let t = TempDir::new();
|
||||
let file_to_watch = t.path().join(filename);
|
||||
write(
|
||||
&file_to_watch,
|
||||
r#"
|
||||
console.log("Starting flash server...");
|
||||
Deno.serve({
|
||||
onListen() {
|
||||
console.error("First server is listening");
|
||||
},
|
||||
handler: () => {},
|
||||
port: 4601,
|
||||
});
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut child = util::deno_cmd()
|
||||
.current_dir(t.path())
|
||||
.arg("run")
|
||||
.arg("--watch")
|
||||
.arg("--unstable")
|
||||
.arg("--allow-net")
|
||||
.arg("-L")
|
||||
.arg("debug")
|
||||
.arg(&file_to_watch)
|
||||
.env("NO_COLOR", "1")
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.unwrap();
|
||||
let (mut stdout_lines, mut stderr_lines) = child_lines(&mut child);
|
||||
|
||||
wait_contains("Starting flash server...", &mut stdout_lines);
|
||||
wait_for(
|
||||
|m| m.contains("Watching paths") && m.contains(filename),
|
||||
&mut stderr_lines,
|
||||
);
|
||||
|
||||
write(
|
||||
&file_to_watch,
|
||||
r#"
|
||||
console.log("Restarting flash server...");
|
||||
Deno.serve({
|
||||
onListen() {
|
||||
console.error("Second server is listening");
|
||||
},
|
||||
handler: () => {},
|
||||
port: 4601,
|
||||
});
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
wait_contains("File change detected! Restarting!", &mut stderr_lines);
|
||||
wait_contains("Restarting flash server...", &mut stdout_lines);
|
||||
wait_contains("Second server is listening", &mut stderr_lines);
|
||||
|
||||
check_alive_then_kill(child);
|
||||
}
|
||||
|
|
23
cli/tests/testdata/run/flash_shutdown/main.ts
vendored
Normal file
23
cli/tests/testdata/run/flash_shutdown/main.ts
vendored
Normal file
|
@ -0,0 +1,23 @@
|
|||
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
// Deno.serve caused segfault with this example after #16383
|
||||
// refs:
|
||||
// - https://github.com/denoland/deno/pull/16383
|
||||
// - https://github.com/denoland/deno_std/issues/2882
|
||||
// - revert https://github.com/denoland/deno/pull/16610
|
||||
|
||||
const ctl = new AbortController();
|
||||
Deno.serve(() =>
|
||||
new Promise((resolve) => {
|
||||
resolve(new Response(new TextEncoder().encode("ok")));
|
||||
ctl.abort();
|
||||
}), {
|
||||
signal: ctl.signal,
|
||||
async onListen({ port }) {
|
||||
const a = await fetch(`http://localhost:${port}`, {
|
||||
method: "POST",
|
||||
body: "",
|
||||
});
|
||||
await a.text();
|
||||
},
|
||||
});
|
|
@ -70,7 +70,6 @@ Deno.test(async function httpServerRejectsOnAddrInUse() {
|
|||
onError: createOnErrorCb(ac),
|
||||
});
|
||||
|
||||
await listeningPromise;
|
||||
assertRejects(
|
||||
() =>
|
||||
Deno.serve({
|
||||
|
|
|
@ -188,8 +188,8 @@
|
|||
return str;
|
||||
}
|
||||
|
||||
function prepareFastCalls(serverId) {
|
||||
return core.ops.op_flash_make_request(serverId);
|
||||
function prepareFastCalls() {
|
||||
return core.ops.op_flash_make_request();
|
||||
}
|
||||
|
||||
function hostnameForDisplay(hostname) {
|
||||
|
@ -482,11 +482,15 @@
|
|||
|
||||
const serverId = opFn(listenOpts);
|
||||
const serverPromise = core.opAsync("op_flash_drive_server", serverId);
|
||||
const listenPromise = PromisePrototypeThen(
|
||||
core.opAsync("op_flash_wait_for_listening", serverId),
|
||||
(port) => {
|
||||
onListen({ hostname: listenOpts.hostname, port });
|
||||
},
|
||||
|
||||
PromisePrototypeCatch(
|
||||
PromisePrototypeThen(
|
||||
core.opAsync("op_flash_wait_for_listening", serverId),
|
||||
(port) => {
|
||||
onListen({ hostname: listenOpts.hostname, port });
|
||||
},
|
||||
),
|
||||
() => {},
|
||||
);
|
||||
const finishedPromise = PromisePrototypeCatch(serverPromise, () => {});
|
||||
|
||||
|
@ -502,7 +506,7 @@
|
|||
return;
|
||||
}
|
||||
server.closed = true;
|
||||
core.ops.op_flash_close_server(serverId);
|
||||
await core.opAsync("op_flash_close_server", serverId);
|
||||
await server.finished;
|
||||
},
|
||||
async serve() {
|
||||
|
@ -614,7 +618,7 @@
|
|||
|
||||
signal?.addEventListener("abort", () => {
|
||||
clearInterval(dateInterval);
|
||||
server.close();
|
||||
PromisePrototypeThen(server.close(), () => {}, () => {});
|
||||
}, {
|
||||
once: true,
|
||||
});
|
||||
|
@ -629,7 +633,7 @@
|
|||
);
|
||||
}
|
||||
|
||||
const fastOp = prepareFastCalls(serverId);
|
||||
const fastOp = prepareFastCalls();
|
||||
let nextRequestSync = () => fastOp.nextRequest();
|
||||
let getMethodSync = (token) => fastOp.getMethod(token);
|
||||
let respondFast = (token, response, shutdown) =>
|
||||
|
@ -649,8 +653,8 @@
|
|||
}
|
||||
|
||||
await SafePromiseAll([
|
||||
listenPromise,
|
||||
PromisePrototypeCatch(server.serve(), console.error),
|
||||
serverPromise,
|
||||
]);
|
||||
};
|
||||
}
|
||||
|
|
248
ext/flash/lib.rs
248
ext/flash/lib.rs
|
@ -35,7 +35,6 @@ use mio::Events;
|
|||
use mio::Interest;
|
||||
use mio::Poll;
|
||||
use mio::Token;
|
||||
use mio::Waker;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use socket2::Socket;
|
||||
|
@ -56,6 +55,7 @@ use std::rc::Rc;
|
|||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::task::Context;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
|
@ -76,24 +76,15 @@ pub struct FlashContext {
|
|||
pub servers: HashMap<u32, ServerContext>,
|
||||
}
|
||||
|
||||
impl Drop for FlashContext {
|
||||
fn drop(&mut self) {
|
||||
// Signal each server instance to shutdown.
|
||||
for (_, server) in self.servers.drain() {
|
||||
let _ = server.waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ServerContext {
|
||||
_addr: SocketAddr,
|
||||
tx: mpsc::Sender<Request>,
|
||||
rx: Option<mpsc::Receiver<Request>>,
|
||||
rx: mpsc::Receiver<Request>,
|
||||
requests: HashMap<u32, Request>,
|
||||
next_token: u32,
|
||||
listening_rx: Option<mpsc::Receiver<Result<u16, std::io::Error>>>,
|
||||
listening_rx: Option<mpsc::Receiver<u16>>,
|
||||
close_tx: mpsc::Sender<()>,
|
||||
cancel_handle: Rc<CancelHandle>,
|
||||
waker: Arc<Waker>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
|
@ -111,10 +102,7 @@ fn op_flash_respond(
|
|||
shutdown: bool,
|
||||
) -> u32 {
|
||||
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
||||
let ctx = match flash_ctx.servers.get_mut(&server_id) {
|
||||
Some(ctx) => ctx,
|
||||
None => return 0,
|
||||
};
|
||||
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
||||
flash_respond(ctx, token, shutdown, &response)
|
||||
}
|
||||
|
||||
|
@ -132,10 +120,7 @@ async fn op_flash_respond_async(
|
|||
let sock = {
|
||||
let mut op_state = state.borrow_mut();
|
||||
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
||||
let ctx = match flash_ctx.servers.get_mut(&server_id) {
|
||||
Some(ctx) => ctx,
|
||||
None => return Ok(()),
|
||||
};
|
||||
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
||||
|
||||
match shutdown {
|
||||
true => {
|
||||
|
@ -399,30 +384,15 @@ fn op_flash_method(state: &mut OpState, server_id: u32, token: u32) -> u32 {
|
|||
}
|
||||
|
||||
#[op]
|
||||
fn op_flash_drive_server(
|
||||
state: &mut OpState,
|
||||
server_id: u32,
|
||||
) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> {
|
||||
let join_handle = {
|
||||
let flash_ctx = state.borrow_mut::<FlashContext>();
|
||||
flash_ctx
|
||||
.join_handles
|
||||
.remove(&server_id)
|
||||
.ok_or_else(|| type_error("server not found"))?
|
||||
async fn op_flash_close_server(state: Rc<RefCell<OpState>>, server_id: u32) {
|
||||
let close_tx = {
|
||||
let mut op_state = state.borrow_mut();
|
||||
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
||||
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
||||
ctx.cancel_handle.cancel();
|
||||
ctx.close_tx.clone()
|
||||
};
|
||||
Ok(async move {
|
||||
join_handle
|
||||
.await
|
||||
.map_err(|_| type_error("server join error"))??;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[op]
|
||||
fn op_flash_close_server(state: &mut OpState, server_id: u32) {
|
||||
let flash_ctx = state.borrow_mut::<FlashContext>();
|
||||
let ctx = flash_ctx.servers.remove(&server_id).unwrap();
|
||||
let _ = ctx.waker.wake();
|
||||
let _ = close_tx.send(()).await;
|
||||
}
|
||||
|
||||
#[op]
|
||||
|
@ -449,7 +419,7 @@ fn op_flash_path(
|
|||
fn next_request_sync(ctx: &mut ServerContext) -> u32 {
|
||||
let offset = ctx.next_token;
|
||||
|
||||
while let Ok(token) = ctx.rx.as_mut().unwrap().try_recv() {
|
||||
while let Ok(token) = ctx.rx.try_recv() {
|
||||
ctx.requests.insert(ctx.next_token, token);
|
||||
ctx.next_token += 1;
|
||||
}
|
||||
|
@ -512,7 +482,6 @@ unsafe fn op_flash_get_method_fast(
|
|||
fn op_flash_make_request<'scope>(
|
||||
scope: &mut v8::HandleScope<'scope>,
|
||||
state: &mut OpState,
|
||||
server_id: u32,
|
||||
) -> serde_v8::Value<'scope> {
|
||||
let object_template = v8::ObjectTemplate::new(scope);
|
||||
assert!(object_template
|
||||
|
@ -520,7 +489,7 @@ fn op_flash_make_request<'scope>(
|
|||
let obj = object_template.new_instance(scope).unwrap();
|
||||
let ctx = {
|
||||
let flash_ctx = state.borrow_mut::<FlashContext>();
|
||||
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
||||
let ctx = flash_ctx.servers.get_mut(&0).unwrap();
|
||||
ctx as *mut ServerContext
|
||||
};
|
||||
obj.set_aligned_pointer_in_internal_field(V8_WRAPPER_OBJECT_INDEX, ctx as _);
|
||||
|
@ -736,10 +705,7 @@ async fn op_flash_read_body(
|
|||
{
|
||||
let op_state = &mut state.borrow_mut();
|
||||
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
||||
match flash_ctx.servers.get_mut(&server_id) {
|
||||
Some(ctx) => ctx as *mut ServerContext,
|
||||
None => return 0,
|
||||
}
|
||||
flash_ctx.servers.get_mut(&server_id).unwrap() as *mut ServerContext
|
||||
}
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
|
@ -841,40 +807,41 @@ pub struct ListenOpts {
|
|||
reuseport: bool,
|
||||
}
|
||||
|
||||
const SERVER_TOKEN: Token = Token(0);
|
||||
// Token reserved for the thread close signal.
|
||||
const WAKER_TOKEN: Token = Token(1);
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn run_server(
|
||||
tx: mpsc::Sender<Request>,
|
||||
listening_tx: mpsc::Sender<Result<u16, std::io::Error>>,
|
||||
listening_tx: mpsc::Sender<u16>,
|
||||
mut close_rx: mpsc::Receiver<()>,
|
||||
addr: SocketAddr,
|
||||
maybe_cert: Option<String>,
|
||||
maybe_key: Option<String>,
|
||||
reuseport: bool,
|
||||
mut poll: Poll,
|
||||
// We put a waker as an unused argument here as it needs to be alive both in
|
||||
// the flash thread and in the main thread (otherwise the notification would
|
||||
// not be caught by the event loop on Linux).
|
||||
// See the comment in mio's example:
|
||||
// https://docs.rs/mio/0.8.4/x86_64-unknown-linux-gnu/mio/struct.Waker.html#examples
|
||||
_waker: Arc<Waker>,
|
||||
) -> Result<(), AnyError> {
|
||||
let mut listener = match listen(addr, reuseport) {
|
||||
Ok(listener) => listener,
|
||||
Err(e) => {
|
||||
listening_tx.blocking_send(Err(e)).unwrap();
|
||||
return Err(generic_error(
|
||||
"failed to start listening on the specified address",
|
||||
));
|
||||
}
|
||||
let domain = if addr.is_ipv4() {
|
||||
socket2::Domain::IPV4
|
||||
} else {
|
||||
socket2::Domain::IPV6
|
||||
};
|
||||
let socket = Socket::new(domain, socket2::Type::STREAM, None)?;
|
||||
|
||||
// Register server.
|
||||
#[cfg(not(windows))]
|
||||
socket.set_reuse_address(true)?;
|
||||
if reuseport {
|
||||
#[cfg(target_os = "linux")]
|
||||
socket.set_reuse_port(true)?;
|
||||
}
|
||||
|
||||
let socket_addr = socket2::SockAddr::from(addr);
|
||||
socket.bind(&socket_addr)?;
|
||||
socket.listen(128)?;
|
||||
socket.set_nonblocking(true)?;
|
||||
let std_listener: std::net::TcpListener = socket.into();
|
||||
let mut listener = TcpListener::from_std(std_listener);
|
||||
|
||||
let mut poll = Poll::new()?;
|
||||
let token = Token(0);
|
||||
poll
|
||||
.registry()
|
||||
.register(&mut listener, SERVER_TOKEN, Interest::READABLE)
|
||||
.register(&mut listener, token, Interest::READABLE)
|
||||
.unwrap();
|
||||
|
||||
let tls_context: Option<Arc<rustls::ServerConfig>> = {
|
||||
|
@ -896,24 +863,30 @@ fn run_server(
|
|||
};
|
||||
|
||||
listening_tx
|
||||
.blocking_send(Ok(listener.local_addr().unwrap().port()))
|
||||
.blocking_send(listener.local_addr().unwrap().port())
|
||||
.unwrap();
|
||||
let mut sockets = HashMap::with_capacity(1000);
|
||||
let mut counter: usize = 2;
|
||||
let mut counter: usize = 1;
|
||||
let mut events = Events::with_capacity(1024);
|
||||
'outer: loop {
|
||||
match poll.poll(&mut events, None) {
|
||||
let result = close_rx.try_recv();
|
||||
if result.is_ok() {
|
||||
break 'outer;
|
||||
}
|
||||
// FIXME(bartlomieju): how does Tokio handle it? I just put random 100ms
|
||||
// timeout here to handle close signal.
|
||||
match poll.poll(&mut events, Some(Duration::from_millis(100))) {
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
|
||||
Err(e) => panic!("{}", e),
|
||||
Ok(()) => (),
|
||||
}
|
||||
'events: for event in &events {
|
||||
if close_rx.try_recv().is_ok() {
|
||||
break 'outer;
|
||||
}
|
||||
let token = event.token();
|
||||
match token {
|
||||
WAKER_TOKEN => {
|
||||
break 'outer;
|
||||
}
|
||||
SERVER_TOKEN => loop {
|
||||
Token(0) => loop {
|
||||
match listener.accept() {
|
||||
Ok((mut socket, _)) => {
|
||||
counter += 1;
|
||||
|
@ -1176,33 +1149,6 @@ fn run_server(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn listen(
|
||||
addr: SocketAddr,
|
||||
reuseport: bool,
|
||||
) -> Result<TcpListener, std::io::Error> {
|
||||
let domain = if addr.is_ipv4() {
|
||||
socket2::Domain::IPV4
|
||||
} else {
|
||||
socket2::Domain::IPV6
|
||||
};
|
||||
let socket = Socket::new(domain, socket2::Type::STREAM, None)?;
|
||||
|
||||
#[cfg(not(windows))]
|
||||
socket.set_reuse_address(true)?;
|
||||
if reuseport {
|
||||
#[cfg(target_os = "linux")]
|
||||
socket.set_reuse_port(true)?;
|
||||
}
|
||||
|
||||
let socket_addr = socket2::SockAddr::from(addr);
|
||||
socket.bind(&socket_addr)?;
|
||||
socket.listen(128)?;
|
||||
socket.set_nonblocking(true)?;
|
||||
let std_listener: std::net::TcpListener = socket.into();
|
||||
Ok(TcpListener::from_std(std_listener))
|
||||
}
|
||||
|
||||
fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) {
|
||||
// Default to localhost if given just the port. Example: ":80"
|
||||
if hostname.is_empty() {
|
||||
|
@ -1240,19 +1186,17 @@ where
|
|||
.next()
|
||||
.ok_or_else(|| generic_error("No resolved address found"))?;
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
let (close_tx, close_rx) = mpsc::channel(1);
|
||||
let (listening_tx, listening_rx) = mpsc::channel(1);
|
||||
|
||||
let poll = Poll::new()?;
|
||||
let waker = Arc::new(Waker::new(poll.registry(), WAKER_TOKEN).unwrap());
|
||||
let ctx = ServerContext {
|
||||
_addr: addr,
|
||||
tx,
|
||||
rx: Some(rx),
|
||||
rx,
|
||||
requests: HashMap::with_capacity(1000),
|
||||
next_token: 0,
|
||||
close_tx,
|
||||
listening_rx: Some(listening_rx),
|
||||
cancel_handle: CancelHandle::new_rc(),
|
||||
waker: waker.clone(),
|
||||
};
|
||||
let tx = ctx.tx.clone();
|
||||
let maybe_cert = opts.cert;
|
||||
|
@ -1262,12 +1206,11 @@ where
|
|||
run_server(
|
||||
tx,
|
||||
listening_tx,
|
||||
close_rx,
|
||||
addr,
|
||||
maybe_cert,
|
||||
maybe_key,
|
||||
reuseport,
|
||||
poll,
|
||||
waker,
|
||||
)
|
||||
});
|
||||
let flash_ctx = state.borrow_mut::<FlashContext>();
|
||||
|
@ -1302,26 +1245,45 @@ where
|
|||
}
|
||||
|
||||
#[op]
|
||||
async fn op_flash_wait_for_listening(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
fn op_flash_wait_for_listening(
|
||||
state: &mut OpState,
|
||||
server_id: u32,
|
||||
) -> Result<u16, AnyError> {
|
||||
) -> Result<impl Future<Output = Result<u16, AnyError>> + 'static, AnyError> {
|
||||
let mut listening_rx = {
|
||||
let mut op_state = state.borrow_mut();
|
||||
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
||||
let flash_ctx = state.borrow_mut::<FlashContext>();
|
||||
let server_ctx = flash_ctx
|
||||
.servers
|
||||
.get_mut(&server_id)
|
||||
.ok_or_else(|| type_error("server not found"))?;
|
||||
server_ctx.listening_rx.take().unwrap()
|
||||
};
|
||||
match listening_rx.recv().await {
|
||||
Some(Ok(port)) => Ok(port),
|
||||
Some(Err(e)) => Err(e.into()),
|
||||
_ => Err(generic_error(
|
||||
"unknown error occurred while waiting for listening",
|
||||
)),
|
||||
}
|
||||
Ok(async move {
|
||||
if let Some(port) = listening_rx.recv().await {
|
||||
Ok(port)
|
||||
} else {
|
||||
Err(generic_error("This error will be discarded"))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[op]
|
||||
fn op_flash_drive_server(
|
||||
state: &mut OpState,
|
||||
server_id: u32,
|
||||
) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> {
|
||||
let join_handle = {
|
||||
let flash_ctx = state.borrow_mut::<FlashContext>();
|
||||
flash_ctx
|
||||
.join_handles
|
||||
.remove(&server_id)
|
||||
.ok_or_else(|| type_error("server not found"))?
|
||||
};
|
||||
Ok(async move {
|
||||
join_handle
|
||||
.await
|
||||
.map_err(|_| type_error("server join error"))??;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
// Asychronous version of op_flash_next. This can be a bottleneck under
|
||||
|
@ -1329,34 +1291,26 @@ async fn op_flash_wait_for_listening(
|
|||
// requests i.e `op_flash_next() == 0`.
|
||||
#[op]
|
||||
async fn op_flash_next_async(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
op_state: Rc<RefCell<OpState>>,
|
||||
server_id: u32,
|
||||
) -> u32 {
|
||||
let mut op_state = state.borrow_mut();
|
||||
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
||||
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
||||
let cancel_handle = ctx.cancel_handle.clone();
|
||||
let mut rx = ctx.rx.take().unwrap();
|
||||
// We need to drop the borrow before await point.
|
||||
drop(op_state);
|
||||
|
||||
if let Ok(Some(req)) = rx.recv().or_cancel(&cancel_handle).await {
|
||||
let mut op_state = state.borrow_mut();
|
||||
let ctx = {
|
||||
let mut op_state = op_state.borrow_mut();
|
||||
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
||||
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
||||
ctx as *mut ServerContext
|
||||
};
|
||||
// SAFETY: we cannot hold op_state borrow across the await point. The JS caller
|
||||
// is responsible for ensuring this is not called concurrently.
|
||||
let ctx = unsafe { &mut *ctx };
|
||||
let cancel_handle = &ctx.cancel_handle;
|
||||
|
||||
if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await {
|
||||
ctx.requests.insert(ctx.next_token, req);
|
||||
ctx.next_token += 1;
|
||||
// Set the rx back.
|
||||
ctx.rx = Some(rx);
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Set the rx back.
|
||||
let mut op_state = state.borrow_mut();
|
||||
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
||||
if let Some(ctx) = flash_ctx.servers.get_mut(&server_id) {
|
||||
ctx.rx = Some(rx);
|
||||
}
|
||||
0
|
||||
}
|
||||
|
||||
|
@ -1524,11 +1478,11 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
|
|||
op_flash_next_async::decl(),
|
||||
op_flash_read_body::decl(),
|
||||
op_flash_upgrade_websocket::decl(),
|
||||
op_flash_drive_server::decl(),
|
||||
op_flash_wait_for_listening::decl(),
|
||||
op_flash_first_packet::decl(),
|
||||
op_flash_has_body_stream::decl(),
|
||||
op_flash_close_server::decl(),
|
||||
op_flash_drive_server::decl(),
|
||||
op_flash_make_request::decl(),
|
||||
op_flash_write_resource::decl(),
|
||||
])
|
||||
|
|
Loading…
Reference in a new issue