mirror of
https://github.com/denoland/deno.git
synced 2024-12-23 23:59:59 -05:00
refactor: fixes for futures (#3363)
After landing #3358 the benchmarks exploded indicating problems with workers and deno_core_http_bench. This PR dramatically fixes thread/syscall count that showed up on benchmarks. Thread count is not back to previous levels but difference went from hundreds/thousands to about ~50.
This commit is contained in:
parent
2b3afda625
commit
f356b2bd5e
5 changed files with 32 additions and 29 deletions
|
@ -272,8 +272,8 @@ impl TsCompiler {
|
|||
|
||||
let worker = TsCompiler::setup_worker(global_state.clone());
|
||||
let worker_ = worker.clone();
|
||||
worker.post_message(req_msg).unwrap();
|
||||
let first_msg_fut = async move {
|
||||
worker.post_message(req_msg).await.unwrap();
|
||||
let result = worker.await;
|
||||
if let Err(err) = result {
|
||||
// TODO(ry) Need to forward the error instead of exiting.
|
||||
|
@ -382,8 +382,8 @@ impl TsCompiler {
|
|||
.add("Compile", &module_url.to_string());
|
||||
let global_state_ = global_state.clone();
|
||||
|
||||
worker.post_message(req_msg).unwrap();
|
||||
let first_msg_fut = async move {
|
||||
worker.post_message(req_msg).await.unwrap();
|
||||
let result = worker.await;
|
||||
if let Err(err) = result {
|
||||
// TODO(ry) Need to forward the error instead of exiting.
|
||||
|
|
|
@ -86,14 +86,13 @@ impl WasmCompiler {
|
|||
let worker_ = worker.clone();
|
||||
let url = source_file.url.clone();
|
||||
|
||||
let _res = worker.post_message(
|
||||
serde_json::to_string(&base64_data)
|
||||
.unwrap()
|
||||
.into_boxed_str()
|
||||
.into_boxed_bytes(),
|
||||
);
|
||||
let fut = worker
|
||||
.post_message(
|
||||
serde_json::to_string(&base64_data)
|
||||
.unwrap()
|
||||
.into_boxed_str()
|
||||
.into_boxed_bytes(),
|
||||
)
|
||||
.then(move |_| worker)
|
||||
.then(move |result| {
|
||||
if let Err(err) = result {
|
||||
// TODO(ry) Need to forward the error instead of exiting.
|
||||
|
|
|
@ -271,7 +271,8 @@ fn op_host_post_message(
|
|||
let mut table = state.workers.lock().unwrap();
|
||||
// TODO: don't return bad resource anymore
|
||||
let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
|
||||
tokio_util::block_on(worker.post_message(msg).boxed())
|
||||
worker
|
||||
.post_message(msg)
|
||||
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
|
||||
Ok(JsonOp::Sync(json!({})))
|
||||
}
|
||||
|
|
|
@ -144,17 +144,12 @@ impl Worker {
|
|||
/// Post message to worker as a host.
|
||||
///
|
||||
/// This method blocks current thread.
|
||||
pub fn post_message(
|
||||
self: &Self,
|
||||
buf: Buf,
|
||||
) -> impl Future<Output = Result<(), ErrBox>> {
|
||||
pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> {
|
||||
let channels = self.external_channels.lock().unwrap();
|
||||
let mut sender = channels.sender.clone();
|
||||
async move {
|
||||
let result = sender.send(buf).map_err(ErrBox::from).await;
|
||||
drop(sender);
|
||||
result
|
||||
}
|
||||
futures::executor::block_on(sender.send(buf))
|
||||
.map(|_| ())
|
||||
.map_err(ErrBox::from)
|
||||
}
|
||||
|
||||
/// Get message from worker as a host.
|
||||
|
@ -396,7 +391,7 @@ mod tests {
|
|||
|
||||
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
|
||||
|
||||
let r = futures::executor::block_on(worker_.post_message(msg).boxed());
|
||||
let r = worker_.post_message(msg);
|
||||
assert!(r.is_ok());
|
||||
|
||||
let maybe_msg =
|
||||
|
@ -409,7 +404,7 @@ mod tests {
|
|||
.to_string()
|
||||
.into_boxed_str()
|
||||
.into_boxed_bytes();
|
||||
let r = futures::executor::block_on(worker_.post_message(msg).boxed());
|
||||
let r = worker_.post_message(msg);
|
||||
assert!(r.is_ok());
|
||||
})
|
||||
}
|
||||
|
@ -439,7 +434,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
|
||||
let r = futures::executor::block_on(worker_.post_message(msg));
|
||||
let r = worker_.post_message(msg);
|
||||
assert!(r.is_ok());
|
||||
|
||||
futures::executor::block_on(worker_future).unwrap();
|
||||
|
|
|
@ -15,6 +15,7 @@ extern crate lazy_static;
|
|||
use deno::*;
|
||||
use futures::future::FutureExt;
|
||||
use futures::future::TryFutureExt;
|
||||
use futures::stream::StreamExt;
|
||||
use std::env;
|
||||
use std::future::Future;
|
||||
use std::io::Error;
|
||||
|
@ -24,6 +25,7 @@ use std::pin::Pin;
|
|||
use std::sync::Mutex;
|
||||
use std::sync::MutexGuard;
|
||||
use std::task::Poll;
|
||||
use tokio::net::tcp::Incoming;
|
||||
use tokio::prelude::Async;
|
||||
use tokio::prelude::AsyncRead;
|
||||
use tokio::prelude::AsyncWrite;
|
||||
|
@ -190,7 +192,7 @@ pub fn bad_resource() -> Error {
|
|||
Error::new(ErrorKind::NotFound, "bad resource id")
|
||||
}
|
||||
|
||||
struct TcpListener(tokio::net::TcpListener);
|
||||
struct TcpListener(Incoming);
|
||||
|
||||
impl Resource for TcpListener {}
|
||||
|
||||
|
@ -213,14 +215,19 @@ fn op_accept(
|
|||
) -> Pin<Box<HttpOp>> {
|
||||
let rid = record.arg as u32;
|
||||
debug!("accept {}", rid);
|
||||
let fut = futures::future::poll_fn(move |_cx| {
|
||||
let fut = futures::future::poll_fn(move |cx| {
|
||||
let mut table = lock_resource_table();
|
||||
let listener =
|
||||
table.get_mut::<TcpListener>(rid).ok_or_else(bad_resource)?;
|
||||
match listener.0.poll_accept() {
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
Ok(Async::Ready(v)) => Poll::Ready(Ok(v)),
|
||||
Ok(Async::NotReady) => Poll::Pending,
|
||||
let mut listener = futures::compat::Compat01As03::new(&mut listener.0);
|
||||
match listener.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Err(e))) => Poll::Ready(Err(e)),
|
||||
Poll::Ready(Some(Ok(stream))) => {
|
||||
let addr = stream.peer_addr().unwrap();
|
||||
Poll::Ready(Ok((stream, addr)))
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
})
|
||||
.and_then(move |(stream, addr)| {
|
||||
|
@ -240,7 +247,8 @@ fn op_listen(
|
|||
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
|
||||
let listener = tokio::net::TcpListener::bind(&addr).unwrap();
|
||||
let mut table = lock_resource_table();
|
||||
let rid = table.add("tcpListener", Box::new(TcpListener(listener)));
|
||||
let rid =
|
||||
table.add("tcpListener", Box::new(TcpListener(listener.incoming())));
|
||||
futures::future::ok(rid as i32).boxed()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue