mirror of
https://github.com/denoland/deno.git
synced 2024-12-22 07:14:47 -05:00
fix: hang in op_http_next_request (#10836)
This commit adds "CancelHandle" to "ConnResource" and changes "op_http_next_request" to await for the cancel signal. In turn when async iterating over "Deno.HttpConn" the iterator breaks upon closing of the resource.
This commit is contained in:
parent
41e9a21307
commit
5457e741fa
2 changed files with 50 additions and 10 deletions
|
@ -339,3 +339,37 @@ unitTest(
|
||||||
await promise;
|
await promise;
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
unitTest(
|
||||||
|
{ perms: { net: true } },
|
||||||
|
async function httpServerNextRequestResolvesOnClose() {
|
||||||
|
const delay = (n: number) =>
|
||||||
|
new Promise((resolve) => setTimeout(resolve, n));
|
||||||
|
const httpConnList: Deno.HttpConn[] = [];
|
||||||
|
|
||||||
|
async function serve(l: Deno.Listener) {
|
||||||
|
for await (const conn of l) {
|
||||||
|
(async () => {
|
||||||
|
const c = Deno.serveHttp(conn);
|
||||||
|
httpConnList.push(c);
|
||||||
|
for await (const { respondWith } of c) {
|
||||||
|
respondWith(new Response("hello"));
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const l = Deno.listen({ port: 4500 });
|
||||||
|
serve(l);
|
||||||
|
|
||||||
|
await delay(300);
|
||||||
|
const res = await fetch("http://localhost:4500/");
|
||||||
|
const _text = await res.text();
|
||||||
|
|
||||||
|
// Close connection and listener.
|
||||||
|
httpConnList.forEach((conn) => conn.close());
|
||||||
|
l.close();
|
||||||
|
|
||||||
|
await delay(300);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
|
@ -14,7 +14,6 @@ use deno_core::futures::StreamExt;
|
||||||
use deno_core::op_async;
|
use deno_core::op_async;
|
||||||
use deno_core::op_sync;
|
use deno_core::op_sync;
|
||||||
use deno_core::AsyncRefCell;
|
use deno_core::AsyncRefCell;
|
||||||
use deno_core::CancelFuture;
|
|
||||||
use deno_core::CancelHandle;
|
use deno_core::CancelHandle;
|
||||||
use deno_core::CancelTryFuture;
|
use deno_core::CancelTryFuture;
|
||||||
use deno_core::Extension;
|
use deno_core::Extension;
|
||||||
|
@ -107,6 +106,7 @@ struct ConnResource {
|
||||||
hyper_connection: ConnType,
|
hyper_connection: ConnType,
|
||||||
deno_service: Service,
|
deno_service: Service,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
|
cancel: CancelHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConnResource {
|
impl ConnResource {
|
||||||
|
@ -124,6 +124,10 @@ impl Resource for ConnResource {
|
||||||
fn name(&self) -> Cow<str> {
|
fn name(&self) -> Cow<str> {
|
||||||
"httpConnection".into()
|
"httpConnection".into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn close(self: Rc<Self>) {
|
||||||
|
self.cancel.cancel()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We use a tuple instead of struct to avoid serialization overhead of the keys.
|
// We use a tuple instead of struct to avoid serialization overhead of the keys.
|
||||||
|
@ -153,6 +157,8 @@ async fn op_http_request_next(
|
||||||
.get::<ConnResource>(conn_rid)
|
.get::<ConnResource>(conn_rid)
|
||||||
.ok_or_else(bad_resource_id)?;
|
.ok_or_else(bad_resource_id)?;
|
||||||
|
|
||||||
|
let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel);
|
||||||
|
|
||||||
poll_fn(|cx| {
|
poll_fn(|cx| {
|
||||||
let connection_closed = match conn_resource.poll(cx) {
|
let connection_closed = match conn_resource.poll(cx) {
|
||||||
Poll::Pending => false,
|
Poll::Pending => false,
|
||||||
|
@ -257,6 +263,7 @@ async fn op_http_request_next(
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
.try_or_cancel(cancel)
|
||||||
.await
|
.await
|
||||||
.map_err(AnyError::from)
|
.map_err(AnyError::from)
|
||||||
}
|
}
|
||||||
|
@ -298,6 +305,7 @@ fn op_http_start(
|
||||||
hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))),
|
hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))),
|
||||||
deno_service,
|
deno_service,
|
||||||
addr,
|
addr,
|
||||||
|
cancel: CancelHandle::default(),
|
||||||
};
|
};
|
||||||
let rid = state.resource_table.add(conn_resource);
|
let rid = state.resource_table.add(conn_resource);
|
||||||
return Ok(rid);
|
return Ok(rid);
|
||||||
|
@ -320,6 +328,7 @@ fn op_http_start(
|
||||||
hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))),
|
hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))),
|
||||||
deno_service,
|
deno_service,
|
||||||
addr,
|
addr,
|
||||||
|
cancel: CancelHandle::default(),
|
||||||
};
|
};
|
||||||
let rid = state.resource_table.add(conn_resource);
|
let rid = state.resource_table.add(conn_resource);
|
||||||
return Ok(rid);
|
return Ok(rid);
|
||||||
|
@ -381,7 +390,6 @@ async fn op_http_response(
|
||||||
let response_body_rid =
|
let response_body_rid =
|
||||||
state.borrow_mut().resource_table.add(ResponseBodyResource {
|
state.borrow_mut().resource_table.add(ResponseBodyResource {
|
||||||
body: AsyncRefCell::new(sender),
|
body: AsyncRefCell::new(sender),
|
||||||
cancel: CancelHandle::default(),
|
|
||||||
conn_rid: response_sender.conn_rid,
|
conn_rid: response_sender.conn_rid,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -484,12 +492,8 @@ async fn op_http_response_write(
|
||||||
.ok_or_else(bad_resource_id)?;
|
.ok_or_else(bad_resource_id)?;
|
||||||
|
|
||||||
let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
|
let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
|
||||||
let cancel = RcRef::map(resource, |r| &r.cancel);
|
|
||||||
|
|
||||||
let mut send_data_fut = body
|
let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local();
|
||||||
.send_data(Vec::from(&*buf).into())
|
|
||||||
.or_cancel(cancel)
|
|
||||||
.boxed_local();
|
|
||||||
|
|
||||||
poll_fn(|cx| {
|
poll_fn(|cx| {
|
||||||
if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
|
if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
|
||||||
|
@ -501,8 +505,7 @@ async fn op_http_response_write(
|
||||||
|
|
||||||
send_data_fut.poll_unpin(cx).map_err(AnyError::from)
|
send_data_fut.poll_unpin(cx).map_err(AnyError::from)
|
||||||
})
|
})
|
||||||
.await?
|
.await?;
|
||||||
.unwrap(); // panic on send_data error
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -520,6 +523,10 @@ impl Resource for RequestBodyResource {
|
||||||
fn name(&self) -> Cow<str> {
|
fn name(&self) -> Cow<str> {
|
||||||
"requestBody".into()
|
"requestBody".into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn close(self: Rc<Self>) {
|
||||||
|
self.cancel.cancel()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ResponseSenderResource {
|
struct ResponseSenderResource {
|
||||||
|
@ -535,7 +542,6 @@ impl Resource for ResponseSenderResource {
|
||||||
|
|
||||||
struct ResponseBodyResource {
|
struct ResponseBodyResource {
|
||||||
body: AsyncRefCell<hyper::body::Sender>,
|
body: AsyncRefCell<hyper::body::Sender>,
|
||||||
cancel: CancelHandle,
|
|
||||||
conn_rid: ResourceId,
|
conn_rid: ResourceId,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue