1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 08:33:43 -05:00

upgrade: tokio 1.0 (#8779)

Co-authored-by: Bert Belder <bertbelder@gmail.com>
This commit is contained in:
Bartek Iwańczuk 2021-01-12 08:50:02 +01:00 committed by GitHub
parent 36ff7bdf57
commit 275a5c65a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 384 additions and 571 deletions

676
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -18,7 +18,6 @@ path = "main.rs"
name = "denort"
path = "main_runtime.rs"
[[bench]]
name = "deno_bench"
harness = false
@ -29,12 +28,12 @@ deno_core = { path = "../core", version = "0.75.0" }
deno_fetch = { path = "../op_crates/fetch", version = "0.18.0" }
deno_web = { path = "../op_crates/web", version = "0.26.0" }
deno_websocket = { path = "../op_crates/websocket", version = "0.1.0" }
regex = "1.3.9"
regex = "1.4.3"
serde = { version = "1.0.116", features = ["derive"] }
[target.'cfg(windows)'.build-dependencies]
winres = "0.1.11"
winapi = "0.3.9"
winres = "0.1.11"
[dependencies]
deno_core = { path = "../core", version = "0.75.0" }
@ -43,28 +42,28 @@ deno_lint = "0.2.15"
deno_runtime = { path = "../runtime", version = "0.5.0" }
atty = "0.2.14"
base64 = "0.12.3"
byteorder = "1.3.4"
base64 = "0.13.0"
byteorder = "1.4.2"
clap = "2.33.3"
dissimilar = "1.0.2"
dprint-plugin-typescript = "0.38.1"
encoding_rs = "0.8.24"
env_logger = "0.7.1"
filetime = "0.2.12"
http = "0.2.1"
indexmap = "1.6.0"
jsonc-parser = "0.14.0"
encoding_rs = "0.8.26"
env_logger = "0.8.2"
filetime = "0.2.13"
http = "0.2.3"
indexmap = "1.6.1"
jsonc-parser = "0.15.1"
lazy_static = "1.4.0"
libc = "0.2.77"
log = { version = "0.4.11", features = ["serde"] }
lspower = "0.1.0"
notify = "5.0.0-pre.3"
libc = "0.2.82"
log = { version = "0.4.13", features = ["serde"] }
lspower = "0.3.0"
notify = "5.0.0-pre.4"
percent-encoding = "2.1.0"
regex = "1.3.9"
regex = "1.4.3"
ring = "0.16.19"
rustyline = { version = "7.1.0", default-features = false }
rustyline-derive = "0.4.0"
semver-parser = "0.9.0"
semver-parser = "0.10.2"
serde = { version = "1.0.116", features = ["derive"] }
shell-escape = "0.1.5"
sourcemap = "6.0.1"
@ -72,25 +71,25 @@ swc_bundler = "0.19.2"
swc_common = { version = "0.10.8", features = ["sourcemap"] }
swc_ecmascript = { version = "0.17.1", features = ["codegen", "dep_graph", "parser", "proposal", "react", "transforms", "typescript", "visit"] }
tempfile = "3.1.0"
termcolor = "1.1.0"
tokio = { version = "0.2.22", features = ["full"] }
tokio-rustls = "0.14.1"
uuid = { version = "0.8.1", features = ["v4"] }
termcolor = "1.1.2"
tokio = { version = "1.0.1", features = ["full"] }
tokio-rustls = "0.22.0"
uuid = { version = "0.8.2", features = ["v4"] }
walkdir = "2.3.1"
[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3.9", features = ["knownfolders", "mswsock", "objbase", "shlobj", "tlhelp32", "winbase", "winerror", "winsock2"] }
fwdansi = "1.1.0"
winapi = { version = "0.3.9", features = ["knownfolders", "mswsock", "objbase", "shlobj", "tlhelp32", "winbase", "winerror", "winsock2"] }
[target.'cfg(unix)'.dependencies]
nix = "0.19.0"
nix = "0.19.1"
[dev-dependencies]
# Used in benchmark
chrono = "0.4.15"
chrono = "0.4.19"
os_pipe = "0.9.2"
test_util = { path = "../test_util" }
tower-test = "0.3.0"
tower-test = "0.4.0"
[target.'cfg(unix)'.dev-dependencies]
exec = "0.3.1" # Used in test_raw_tty

View file

@ -18,21 +18,21 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::time::{delay_for, Delay};
use tokio::time::sleep;
const DEBOUNCE_INTERVAL_MS: Duration = Duration::from_millis(200);
type FileWatcherFuture<T> = Pin<Box<dyn Future<Output = T>>>;
struct Debounce {
delay: Delay,
sleep: Pin<Box<dyn Future<Output = ()>>>,
event_detected: Arc<AtomicBool>,
}
impl Debounce {
fn new() -> Self {
Self {
delay: delay_for(DEBOUNCE_INTERVAL_MS),
sleep: sleep(DEBOUNCE_INTERVAL_MS).boxed_local(),
event_detected: Arc::new(AtomicBool::new(false)),
}
}
@ -52,9 +52,9 @@ impl Stream for Debounce {
inner.event_detected.store(false, Ordering::Relaxed);
Poll::Ready(Some(()))
} else {
match inner.delay.poll_unpin(cx) {
match inner.sleep.poll_unpin(cx) {
Poll::Ready(_) => {
inner.delay = delay_for(DEBOUNCE_INTERVAL_MS);
inner.sleep = sleep(DEBOUNCE_INTERVAL_MS).boxed_local();
Poll::Pending
}
Poll::Pending => Poll::Pending,

View file

@ -67,10 +67,11 @@ pub fn server_capabilities(
color_provider: None,
execute_command_provider: None,
call_hierarchy_provider: None,
on_type_rename_provider: None,
semantic_highlighting: None,
semantic_tokens_provider: None,
workspace: None,
experimental: None,
linked_editing_range_provider: None,
moniker_provider: None,
}
}

View file

@ -53,7 +53,7 @@ impl TsServer {
// the language server...
let mut ts_runtime = start(false).expect("could not start tsc");
let mut runtime = create_basic_runtime();
let runtime = create_basic_runtime();
runtime.block_on(async {
while let Some((req, state_snapshot, tx)) = rx.recv().await {
let value = request(&mut ts_runtime, state_snapshot, req);
@ -482,6 +482,7 @@ impl RenameLocations {
}
Ok(lsp_types::WorkspaceEdit {
change_annotations: None,
changes: None,
document_changes: Some(lsp_types::DocumentChanges::Edits(
text_document_edit_map.values().cloned().collect(),

View file

@ -1,8 +1,7 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
pub fn create_basic_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new()
.basic_scheduler()
tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
// This limits the number of threads for blocking operations (like for
@ -10,7 +9,7 @@ pub fn create_basic_runtime() -> tokio::runtime::Runtime {
// parallel for deno fmt.
// The default value is 512, which is an unhelpfully large thread pool. We
// don't ever want to have more than a couple dozen threads.
.max_threads(32)
.max_blocking_threads(32)
.build()
.unwrap()
}
@ -20,6 +19,6 @@ pub fn run_basic<F, R>(future: F) -> R
where
F: std::future::Future<Output = R>,
{
let mut rt = create_basic_runtime();
let rt = create_basic_runtime();
rt.block_on(future)
}

View file

@ -6,6 +6,7 @@ use crate::colors;
use crate::media_type::MediaType;
use crate::program_state::ProgramState;
use deno_core::error::AnyError;
use deno_core::futures::FutureExt;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
use deno_runtime::inspector::InspectorSession;
@ -277,7 +278,7 @@ async fn post_message_and_poll(
// A zero delay is long enough to yield the thread in order to prevent the loop from
// running hot for messages that are taking longer to resolve like for example an
// evaluation of top level await.
tokio::time::delay_for(tokio::time::Duration::from_millis(0)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(0)).await;
}
}
}
@ -305,7 +306,7 @@ async fn read_line_and_poll(
// Because an inspector websocket client may choose to connect at anytime when we have an
// inspector server we need to keep polling the worker to pick up new connections.
let mut timeout =
tokio::time::delay_for(tokio::time::Duration::from_millis(100));
tokio::time::sleep(tokio::time::Duration::from_millis(100)).boxed_local();
tokio::select! {
result = &mut line => {

View file

@ -13,18 +13,18 @@ repository = "https://github.com/denoland/deno"
path = "lib.rs"
[dependencies]
anyhow = "1.0.32"
futures = "0.3.8"
indexmap = "1.6.0"
anyhow = "1.0.38"
futures = "0.3.9"
indexmap = "1.6.1"
lazy_static = "1.4.0"
libc = "0.2.77"
log = "0.4.11"
libc = "0.2.82"
log = "0.4.13"
pin-project = "1.0.4"
rusty_v8 = "0.15.0"
serde_json = { version = "1.0", features = ["preserve_order"] }
serde = { version = "1.0", features = ["derive"] }
smallvec = "1.4.2"
url = { version = "2.2", features = ["serde"] }
pin-project = "1.0.2"
serde = { version = "1.0.116", features = ["derive"] }
serde_json = { version = "1.0.61", features = ["preserve_order"] }
smallvec = "1.6.1"
url = { version = "2.2.0", features = ["serde"] }
[[example]]
name = "http_bench_bin_ops"
@ -36,4 +36,4 @@ path = "examples/http_bench_json_ops.rs"
# These dependencies are only used for the 'http_bench_*_ops' examples.
[dev-dependencies]
tokio = { version = "0.3.5", features = ["full"] }
tokio = { version = "1.0.1", features = ["full"] }

View file

@ -14,9 +14,10 @@ repository = "https://github.com/denoland/deno"
path = "lib.rs"
[dependencies]
bytes = "1.0.1"
deno_core = { version = "0.75.0", path = "../../core" }
bytes = "0.5.6"
reqwest = { version = "0.10.8", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli"] }
reqwest = { version = "0.11.0", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli"] }
serde = { version = "1.0.116", features = ["derive"] }
tokio = { version = "0.2.22", features = ["full"] }
tokio = { version = "1.0.1", features = ["full"] }
tokio-stream = "0.1.1"
tokio-util = "0.6.0"

View file

@ -16,6 +16,7 @@ use deno_core::AsyncRefCell;
use deno_core::BufVec;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::JsRuntime;
use deno_core::OpState;
use deno_core::RcRef;
@ -38,10 +39,10 @@ use std::io::Read;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use tokio::io::stream_reader;
use tokio::io::AsyncReadExt;
use tokio::io::StreamReader;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::StreamReader;
pub use reqwest; // Re-export reqwest
@ -157,7 +158,7 @@ where
0 => {
// If no body is passed, we return a writer for streaming the body.
let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1);
request = request.body(Body::wrap_stream(rx));
request = request.body(Body::wrap_stream(ReceiverStream::new(rx)));
let request_body_rid =
state.resource_table.add(FetchRequestBodyResource {
@ -247,7 +248,7 @@ pub async fn op_fetch_send(
let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| {
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
}));
let stream_reader = stream_reader(stream);
let stream_reader = StreamReader::new(stream);
let rid = state
.borrow_mut()
.resource_table
@ -288,7 +289,7 @@ pub async fn op_fetch_request_write(
.resource_table
.get::<FetchRequestBodyResource>(rid as u32)
.ok_or_else(bad_resource_id)?;
let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
let body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(resource, |r| &r.cancel);
body.send(Ok(buf)).or_cancel(cancel).await??;
@ -321,7 +322,7 @@ pub async fn op_fetch_response_read(
let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await;
let cancel = RcRef::map(resource, |r| &r.cancel);
let mut buf = data[0].clone();
let read = reader.read(&mut buf).or_cancel(cancel).await??;
let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
Ok(json!({ "read": read }))
}

View file

@ -19,4 +19,4 @@ idna = "0.2.0"
serde = { version = "1.0.116", features = ["derive"] }
[dev-dependencies]
futures = "0.3.8"
futures = "0.3.9"

View file

@ -15,10 +15,10 @@ path = "lib.rs"
[dependencies]
deno_core = { version = "0.75.0", path = "../../core" }
http = "0.2.1"
tokio = { version = "0.2.22", features = ["full"] }
tokio-rustls = "0.14.1"
tokio-tungstenite = "0.11.0"
http = "0.2.3"
serde = { version = "1.0.116", features = ["derive"] }
webpki = "0.21.3"
webpki-roots = "=0.19.0" # Pinned to v0.19.0 to match 'reqwest'.
tokio = { version = "1.0.1", features = ["full"] }
tokio-rustls = "0.22.0"
tokio-tungstenite = "0.13.0"
webpki = "0.21.4"
webpki-roots = "0.21.0"

View file

@ -37,37 +37,37 @@ deno_websocket = { path = "../op_crates/websocket", version = "0.1.0" }
atty = "0.2.14"
dlopen = "0.1.8"
encoding_rs = "0.8.24"
env_logger = "0.7.1"
filetime = "0.2.12"
http = "0.2.1"
indexmap = "1.6.0"
encoding_rs = "0.8.26"
env_logger = "0.8.2"
filetime = "0.2.13"
http = "0.2.3"
hyper = { version = "0.14.2", features = ["server"] }
indexmap = "1.6.1"
lazy_static = "1.4.0"
libc = "0.2.77"
log = "0.4.11"
notify = "5.0.0-pre.3"
libc = "0.2.82"
log = "0.4.13"
notify = "5.0.0-pre.4"
percent-encoding = "2.1.0"
regex = "1.3.9"
regex = "1.4.3"
ring = "0.16.19"
rustyline = { version = "7.1.0", default-features = false }
rustyline-derive = "0.4.0"
serde = { version = "1.0.116", features = ["derive"] }
shell-escape = "0.1.5"
sys-info = "0.7.0"
termcolor = "1.1.0"
tokio = { version = "0.2.22", features = ["full"] }
tokio-rustls = "0.14.1"
uuid = { version = "0.8.1", features = ["v4"] }
hyper = "0.13.9"
webpki = "0.21.3"
webpki-roots = "=0.19.0" # Pinned to v0.19.0 to match 'reqwest'.
termcolor = "1.1.2"
tokio = { version = "1.0.1", features = ["full"] }
tokio-rustls = "0.22.0"
uuid = { version = "0.8.2", features = ["v4"] }
webpki = "0.21.4"
webpki-roots = "0.21.0"
[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3.9", features = ["knownfolders", "mswsock", "objbase", "shlobj", "tlhelp32", "winbase", "winerror", "winsock2"] }
fwdansi = "1.1.0"
winapi = { version = "0.3.9", features = ["knownfolders", "mswsock", "objbase", "shlobj", "tlhelp32", "winbase", "winerror", "winsock2"] }
[target.'cfg(unix)'.dependencies]
nix = "0.19.0"
nix = "0.19.1"
[dev-dependencies]
# Used in benchmark

View file

@ -16,6 +16,7 @@ use deno_core::futures::pin_mut;
use deno_core::futures::prelude::*;
use deno_core::futures::select;
use deno_core::futures::stream::FuturesUnordered;
use deno_core::futures::stream::StreamExt;
use deno_core::futures::task;
use deno_core::futures::task::Context;
use deno_core::futures::task::Poll;
@ -58,10 +59,10 @@ impl InspectorServer {
let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel();
let thread_handle = thread::spawn(move || {
let mut rt = crate::tokio_util::create_basic_runtime();
let rt = crate::tokio_util::create_basic_runtime();
let local = tokio::task::LocalSet::new();
local.block_on(
&mut rt,
&rt,
server(host, register_inspector_rx, shutdown_server_rx, name),
)
});
@ -182,9 +183,13 @@ fn handle_ws_request(
.status(http::StatusCode::BAD_REQUEST)
.body("Not a valid Websocket Request".into()),
});
let (parts, _) = req.into_parts();
let req = http::Request::from_parts(parts, body);
if resp.is_ok() {
tokio::task::spawn_local(async move {
let upgraded = body.on_upgrade().await.unwrap();
let upgraded = hyper::upgrade::on(req).await.unwrap();
let websocket =
deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
upgraded,

View file

@ -27,6 +27,7 @@ use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tokio::io::AsyncSeekExt;
#[cfg(not(unix))]
use deno_core::error::generic_error;

View file

@ -99,7 +99,7 @@ fn op_fs_events_open(
let mut watcher: RecommendedWatcher =
Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| {
let res2 = res.map(FsEvent::from).map_err(AnyError::from);
let mut sender = sender.lock().unwrap();
let sender = sender.lock().unwrap();
// Ignore result, if send failed it means that watcher was already closed,
// but not all messages have been flushed.
let _ = sender.try_send(res2);

View file

@ -1,6 +1,5 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use crate::ops::io::FullDuplexResource;
use crate::ops::io::TcpStreamResource;
use crate::permissions::Permissions;
use crate::resolve_addr::resolve_addr;
@ -28,7 +27,7 @@ use std::cell::RefCell;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::rc::Rc;
use tokio::net::udp;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
@ -67,7 +66,7 @@ async fn accept_tcp(
.resource_table
.get::<TcpListenerResource>(rid)
.ok_or_else(|| bad_resource("Listener has been closed"))?;
let mut listener = RcRef::map(&resource, |r| &r.listener)
let listener = RcRef::map(&resource, |r| &r.listener)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);
@ -140,11 +139,11 @@ async fn receive_udp(
.resource_table
.get::<UdpSocketResource>(rid)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
let (size, remote_addr) = resource
.rd_borrow_mut()
.await
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
let cancel_handle = RcRef::map(&resource, |r| &r.cancel);
let (size, remote_addr) = socket
.recv_from(&mut zero_copy)
.try_or_cancel(resource.cancel_handle())
.try_or_cancel(cancel_handle)
.await?;
Ok(json!({
"size": size,
@ -212,11 +211,8 @@ async fn op_datagram_send(
.resource_table
.get::<UdpSocketResource>(rid as u32)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
let byte_length = resource
.wr_borrow_mut()
.await
.send_to(&zero_copy, &addr)
.await?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
let byte_length = socket.send_to(&zero_copy, &addr).await?;
Ok(json!(byte_length))
}
#[cfg(unix)]
@ -237,7 +233,7 @@ async fn op_datagram_send(
.ok_or_else(|| {
custom_error("NotConnected", "Socket has been closed")
})?;
let mut socket = RcRef::map(&resource, |r| &r.socket)
let socket = RcRef::map(&resource, |r| &r.socket)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
let byte_length = socket.send_to(&zero_copy, address_path).await?;
@ -350,7 +346,8 @@ async fn op_shutdown(
let rid = args.rid as u32;
let how = args.how;
let shutdown_mode = match how {
// TODO(bartlomieju): no longer needed after Tokio 1.0 upgrade
let _shutdown_mode = match how {
0 => Shutdown::Read, // TODO: nonsense, remove me.
1 => Shutdown::Write,
_ => unimplemented!(),
@ -362,18 +359,18 @@ async fn op_shutdown(
.get_any(rid)
.ok_or_else(bad_resource_id)?;
if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() {
let wr = stream.wr_borrow_mut().await;
TcpStream::shutdown((*wr).as_ref(), shutdown_mode)?;
let mut wr = stream.wr_borrow_mut().await;
wr.shutdown().await?;
return Ok(json!({}));
}
#[cfg(unix)]
if let Some(stream) = resource.downcast_rc::<StreamResource>() {
if stream.unix_stream.is_some() {
let wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
let mut wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
.borrow_mut()
.await;
net_unix::UnixStream::shutdown(&*wr, shutdown_mode)?;
wr.shutdown().await?;
return Ok(json!({}));
}
}
@ -396,7 +393,10 @@ impl Resource for TcpListenerResource {
}
}
type UdpSocketResource = FullDuplexResource<udp::RecvHalf, udp::SendHalf>;
struct UdpSocketResource {
socket: AsyncRefCell<UdpSocket>,
cancel: CancelHandle,
}
impl Resource for UdpSocketResource {
fn name(&self) -> Cow<str> {
@ -404,7 +404,7 @@ impl Resource for UdpSocketResource {
}
fn close(self: Rc<Self>) {
self.cancel_read_ops()
self.cancel.cancel()
}
}
@ -434,6 +434,7 @@ fn listen_tcp(
addr: SocketAddr,
) -> Result<(u32, SocketAddr), AnyError> {
let std_listener = std::net::TcpListener::bind(&addr)?;
std_listener.set_nonblocking(true)?;
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let listener_resource = TcpListenerResource {
@ -450,9 +451,13 @@ fn listen_udp(
addr: SocketAddr,
) -> Result<(u32, SocketAddr), AnyError> {
let std_socket = std::net::UdpSocket::bind(&addr)?;
std_socket.set_nonblocking(true)?;
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
let socket_resource = UdpSocketResource::new(socket.split());
let socket_resource = UdpSocketResource {
socket: AsyncRefCell::new(socket),
cancel: Default::default(),
};
let rid = state.resource_table.add(socket_resource);
Ok((rid, local_addr))

View file

@ -19,7 +19,6 @@ use serde::Deserialize;
use std::borrow::Cow;
use std::cell::RefCell;
use std::fs::remove_file;
use std::os::unix;
use std::path::Path;
use std::rc::Rc;
use tokio::net::UnixDatagram;
@ -73,7 +72,7 @@ pub(crate) async fn accept_unix(
.resource_table
.get::<UnixListenerResource>(rid)
.ok_or_else(|| bad_resource("Listener has been closed"))?;
let mut listener = RcRef::map(&resource, |r| &r.listener)
let listener = RcRef::map(&resource, |r| &r.listener)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Listener already in use"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);
@ -113,7 +112,7 @@ pub(crate) async fn receive_unix_packet(
.resource_table
.get::<UnixDatagramResource>(rid)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
let mut socket = RcRef::map(&resource, |r| &r.socket)
let socket = RcRef::map(&resource, |r| &r.socket)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);
@ -131,7 +130,7 @@ pub(crate) async fn receive_unix_packet(
pub fn listen_unix(
state: &mut OpState,
addr: &Path,
) -> Result<(u32, unix::net::SocketAddr), AnyError> {
) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
if addr.exists() {
remove_file(&addr).unwrap();
}
@ -149,7 +148,7 @@ pub fn listen_unix(
pub fn listen_unix_packet(
state: &mut OpState,
addr: &Path,
) -> Result<(u32, unix::net::SocketAddr), AnyError> {
) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
if addr.exists() {
remove_file(&addr).unwrap();
}

View file

@ -199,7 +199,7 @@ async fn op_run_status(
.get::<ChildResource>(rid)
.ok_or_else(bad_resource_id)?;
let mut child = resource.borrow_mut().await;
let run_status = (&mut *child).await?;
let run_status = child.wait().await?;
let code = run_status.code();
#[cfg(unix)]

View file

@ -60,7 +60,7 @@ impl GlobalTimer {
let (tx, rx) = oneshot::channel();
self.tx = Some(tx);
let delay = tokio::time::delay_until(deadline.into());
let delay = tokio::time::sleep_until(deadline.into()).boxed_local();
let rx = rx
.map_err(|err| panic!("Unexpected error in receiving channel {:?}", err));

View file

@ -303,6 +303,7 @@ fn op_listen_tls(
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let std_listener = std::net::TcpListener::bind(&addr)?;
std_listener.set_nonblocking(true)?;
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let tls_listener_resource = TlsListenerResource {
@ -341,7 +342,7 @@ async fn op_accept_tls(
.resource_table
.get::<TlsListenerResource>(rid)
.ok_or_else(|| bad_resource("Listener has been closed"))?;
let mut listener = RcRef::map(&resource, |r| &r.listener)
let listener = RcRef::map(&resource, |r| &r.listener)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
let cancel = RcRef::map(resource, |r| &r.cancel);

View file

@ -1,8 +1,7 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
pub fn create_basic_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new()
.basic_scheduler()
tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
// This limits the number of threads for blocking operations (like for
@ -10,7 +9,7 @@ pub fn create_basic_runtime() -> tokio::runtime::Runtime {
// parallel for deno fmt.
// The default value is 512, which is an unhelpfully large thread pool. We
// don't ever want to have more than a couple dozen threads.
.max_threads(32)
.max_blocking_threads(32)
.build()
.unwrap()
}
@ -20,6 +19,6 @@ pub fn run_basic<F, R>(future: F) -> R
where
F: std::future::Future<Output = R>,
{
let mut rt = create_basic_runtime();
let rt = create_basic_runtime();
rt.block_on(future)
}

View file

@ -430,7 +430,7 @@ pub fn run_web_worker(
) -> Result<(), AnyError> {
let name = worker.name.to_string();
let mut rt = create_basic_runtime();
let rt = create_basic_runtime();
// TODO(bartlomieju): run following block using "select!"
// with terminate

View file

@ -11,8 +11,8 @@ publish = false
crate-type = ["cdylib"]
[dependencies]
futures = "0.3.8"
deno_core = { path = "../core" }
futures = "0.3.9"
[dev-dependencies]
test_util = { path = "../test_util" }

View file

@ -12,17 +12,17 @@ name = "test_server"
path = "src/test_server.rs"
[dependencies]
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
bytes = "1"
lazy_static = "1.4.0"
os_pipe = "0.9"
regex = "1.3.9"
tempfile = "3.1.0"
hyper = { version = "0.14.2", features = ["server", "http1", "runtime"] }
tokio-tungstenite = "0.13"
tokio-rustls = "0.22"
async-stream = "0.3.0"
bytes = "1.0.1"
futures = "0.3.9"
hyper = { version = "0.14.2", features = ["server", "http1", "runtime"] }
lazy_static = "1.4.0"
os_pipe = "0.9.2"
regex = "1.4.3"
tempfile = "3.1.0"
tokio = { version = "1.0.1", features = ["full"] }
tokio-rustls = "0.22.0"
tokio-tungstenite = "0.13.0"
[target.'cfg(unix)'.dependencies]
pty = "0.2.2"