From 92b0a94c23c2c5df969969c20a6ef760e499f984 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 19 Oct 2018 18:38:27 -0400 Subject: [PATCH] Optimization: eager accept --- src/ops.rs | 3 +-- src/resources.rs | 53 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/src/ops.rs b/src/ops.rs index 55ee279fac..4adea46f65 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -10,7 +10,6 @@ use isolate::Op; use msg; use resources; use resources::Resource; -use tokio_util; use version; use flatbuffers::FlatBufferBuilder; @@ -1185,7 +1184,7 @@ fn op_accept( match resources::lookup(server_rid) { None => odd_future(errors::bad_resource()), Some(server_resource) => { - let op = tokio_util::accept(server_resource) + let op = resources::eager_accept(server_resource) .map_err(|err| DenoError::from(err)) .and_then(move |(tcp_stream, _socket_addr)| { new_conn(cmd_id, tcp_stream) diff --git a/src/resources.rs b/src/resources.rs index e40b019aa3..b6449c37eb 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -9,6 +9,7 @@ // handlers) look up resources by their integer id here. use errors::DenoError; +use tokio_util; use tokio_write; use futures; @@ -291,3 +292,55 @@ where }, } } + +type EagerAccept = Either< + tokio_util::Accept, + FutureResult<(TcpStream, SocketAddr), std::io::Error>, +>; + +#[cfg(windows)] +pub fn eager_accept(resource: Resource) -> EagerAccept { + Either::A(tokio_util::accept(resource)).into() +} + +// This is an optimization that Tokio should do. +// Attempt to call write() on the main thread. +#[cfg(not(windows))] +pub fn eager_accept(resource: Resource) -> EagerAccept { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&resource.rid); + match maybe_repr { + None => panic!("bad rid"), + Some(repr) => match repr { + Repr::TcpListener(ref mut listener) => { + // Unforunately we can't just call write() on tokio::net::TcpStream + use std::os::unix::io::AsRawFd; + use std::os::unix::io::FromRawFd; + use std::os::unix::io::IntoRawFd; + let mut std_listener = + unsafe { std::net::TcpListener::from_raw_fd(listener.as_raw_fd()) }; + let result = std_listener.accept(); + // std_listener will close when it gets dropped. Thus... + let _ = std_listener.into_raw_fd(); + match result { + Ok((std_stream, addr)) => { + let result = tokio::net::TcpStream::from_std( + std_stream, + &tokio::reactor::Handle::default(), + ); + let tokio_stream = result.unwrap(); + Either::B(futures::future::ok((tokio_stream, addr))) + } + Err(err) => { + if err.kind() == std::io::ErrorKind::WouldBlock { + Either::A(tokio_util::accept(resource)) + } else { + Either::B(futures::future::err(err)) + } + } + } + } + _ => Either::A(tokio_util::accept(resource)), + }, + } +}