From 25c276055b3dfdcecd77d18a0c6ebfcee531442d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 7 Nov 2019 17:11:15 +0100 Subject: [PATCH] refactor: remove cli::resources::Resource (#3285) --- cli/lib.rs | 2 - cli/ops/fetch.rs | 4 +- cli/ops/files.rs | 55 ++++++++++++++++-- cli/ops/io.rs | 134 ++++++++++++++++++++++++++++++++++++++------ cli/ops/net.rs | 32 ++++++----- cli/ops/repl.rs | 4 +- cli/ops/tls.rs | 17 +++--- cli/resources.rs | 137 +++++++-------------------------------------- cli/tokio_read.rs | 64 --------------------- cli/tokio_write.rs | 62 -------------------- cli/worker.rs | 9 +-- 11 files changed, 219 insertions(+), 301 deletions(-) delete mode 100644 cli/tokio_read.rs delete mode 100644 cli/tokio_write.rs diff --git a/cli/lib.rs b/cli/lib.rs index b6922c5916..a22f61799b 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -50,9 +50,7 @@ pub mod source_maps; mod startup_data; pub mod state; pub mod test_util; -mod tokio_read; mod tokio_util; -mod tokio_write; pub mod version; pub mod worker; diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index 1d330ce41f..1433311710 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -62,10 +62,10 @@ pub fn op_fetch( } let body = res.into_body(); - let body_resource = resources::add_reqwest_body(body); + let rid = resources::add_reqwest_body(body); let json_res = json!({ - "bodyRid": body_resource.rid, + "bodyRid": rid, "status": status.as_u16(), "statusText": status.canonical_reason().unwrap_or(""), "headers": res_headers diff --git a/cli/ops/files.rs b/cli/ops/files.rs index c1e43ff956..04b5f98bfd 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -1,14 +1,19 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::deno_error::bad_resource; +use crate::deno_error::DenoError; +use crate::deno_error::ErrorKind; use crate::fs as deno_fs; use crate::ops::json_op; use crate::resources; +use crate::resources::CliResource; use crate::state::ThreadSafeState; use deno::*; use futures::Future; +use futures::Poll; use std; use std::convert::From; +use std::io::SeekFrom; use tokio; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { @@ -86,8 +91,8 @@ fn op_open( let is_sync = args.promise_id.is_none(); let op = open_options.open(filename).map_err(ErrBox::from).and_then( move |fs_file| { - let resource = resources::add_fs_file(fs_file); - futures::future::ok(json!(resource.rid)) + let rid = resources::add_fs_file(fs_file); + futures::future::ok(json!(rid)) }, ); @@ -116,6 +121,31 @@ fn op_close( Ok(JsonOp::Sync(json!({}))) } +#[derive(Debug)] +pub struct SeekFuture { + seek_from: SeekFrom, + rid: ResourceId, +} + +impl Future for SeekFuture { + type Item = u64; + type Error = ErrBox; + + fn poll(&mut self) -> Poll { + let mut table = resources::lock_resource_table(); + let resource = table + .get_mut::(self.rid) + .ok_or_else(bad_resource)?; + + let tokio_file = match resource { + CliResource::FsFile(ref mut file) => file, + _ => return Err(bad_resource()), + }; + + tokio_file.poll_seek(self.seek_from).map_err(ErrBox::from) + } +} + #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct SeekArgs { @@ -131,10 +161,25 @@ fn op_seek( _zero_copy: Option, ) -> Result { let args: SeekArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + let offset = args.offset; + let whence = args.whence as u32; + // Translate seek mode to Rust repr. + let seek_from = match whence { + 0 => SeekFrom::Start(offset as u64), + 1 => SeekFrom::Current(i64::from(offset)), + 2 => SeekFrom::End(i64::from(offset)), + _ => { + return Err(ErrBox::from(DenoError::new( + ErrorKind::InvalidSeekMode, + format!("Invalid seek mode: {}", whence), + ))); + } + }; - let resource = resources::lookup(args.rid as u32)?; - let op = resources::seek(resource, args.offset, args.whence as u32) - .and_then(move |_| futures::future::ok(json!({}))); + let fut = SeekFuture { seek_from, rid }; + + let op = fut.and_then(move |_| futures::future::ok(json!({}))); if args.promise_id.is_none() { let buf = op.wait()?; Ok(JsonOp::Sync(buf)) diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 98ac2f3952..3ede4b4112 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -1,18 +1,76 @@ use super::dispatch_minimal::MinimalOp; use crate::deno_error; +use crate::deno_error::bad_resource; use crate::ops::minimal_op; use crate::resources; +use crate::resources::CliResource; +use crate::resources::DenoAsyncRead; +use crate::resources::DenoAsyncWrite; use crate::state::ThreadSafeState; -use crate::tokio_read; -use crate::tokio_write; use deno::*; use futures::Future; +use futures::Poll; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("read", s.core_op(minimal_op(op_read))); i.register_op("write", s.core_op(minimal_op(op_write))); } +#[derive(Debug, PartialEq)] +enum IoState { + Pending, + Done, +} + +/// Tries to read some bytes directly into the given `buf` in asynchronous +/// manner, returning a future type. +/// +/// The returned future will resolve to both the I/O stream and the buffer +/// as well as the number of bytes read once the read operation is completed. +pub fn read(rid: ResourceId, buf: T) -> Read +where + T: AsMut<[u8]>, +{ + Read { + rid, + buf, + state: IoState::Pending, + } +} + +/// A future which can be used to easily read available number of bytes to fill +/// a buffer. +/// +/// Created by the [`read`] function. +#[derive(Debug)] +pub struct Read { + rid: ResourceId, + buf: T, + state: IoState, +} + +impl Future for Read +where + T: AsMut<[u8]>, +{ + type Item = usize; + type Error = ErrBox; + + fn poll(&mut self) -> Poll { + if self.state == IoState::Done { + panic!("poll a Read after it's done"); + } + + let mut table = resources::lock_resource_table(); + let resource = table + .get_mut::(self.rid) + .ok_or_else(bad_resource)?; + let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..])); + self.state = IoState::Done; + Ok(nread.into()) + } +} + pub fn op_read(rid: i32, zero_copy: Option) -> Box { debug!("read rid={}", rid); let zero_copy = match zero_copy { @@ -22,13 +80,58 @@ pub fn op_read(rid: i32, zero_copy: Option) -> Box { Some(buf) => buf, }; - match resources::lookup(rid as u32) { - Err(e) => Box::new(futures::future::err(e)), - Ok(resource) => Box::new( - tokio_read::read(resource, zero_copy) - .map_err(ErrBox::from) - .and_then(move |(_resource, _buf, nread)| Ok(nread as i32)), - ), + let fut = read(rid as u32, zero_copy) + .map_err(ErrBox::from) + .and_then(move |nread| Ok(nread as i32)); + + Box::new(fut) +} + +/// A future used to write some data to a stream. +#[derive(Debug)] +pub struct Write { + rid: ResourceId, + buf: T, + state: IoState, +} + +/// Creates a future that will write some of the buffer `buf` to +/// the stream resource with `rid`. +/// +/// Any error which happens during writing will cause both the stream and the +/// buffer to get destroyed. +pub fn write(rid: ResourceId, buf: T) -> Write +where + T: AsRef<[u8]>, +{ + Write { + rid, + buf, + state: IoState::Pending, + } +} + +/// This is almost the same implementation as in tokio, difference is +/// that error type is `ErrBox` instead of `std::io::Error`. +impl Future for Write +where + T: AsRef<[u8]>, +{ + type Item = usize; + type Error = ErrBox; + + fn poll(&mut self) -> Poll { + if self.state == IoState::Done { + panic!("poll a Read after it's done"); + } + + let mut table = resources::lock_resource_table(); + let resource = table + .get_mut::(self.rid) + .ok_or_else(bad_resource)?; + let nwritten = try_ready!(resource.poll_write(self.buf.as_ref())); + self.state = IoState::Done; + Ok(nwritten.into()) } } @@ -41,12 +144,9 @@ pub fn op_write(rid: i32, zero_copy: Option) -> Box { Some(buf) => buf, }; - match resources::lookup(rid as u32) { - Err(e) => Box::new(futures::future::err(e)), - Ok(resource) => Box::new( - tokio_write::write(resource, zero_copy) - .map_err(ErrBox::from) - .and_then(move |(_resource, _buf, nwritten)| Ok(nwritten as i32)), - ), - } + let fut = write(rid as u32, zero_copy) + .map_err(ErrBox::from) + .and_then(move |nwritten| Ok(nwritten as i32)); + + Box::new(fut) } diff --git a/cli/ops/net.rs b/cli/ops/net.rs index d603b746bf..a4b3bf934e 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -4,7 +4,7 @@ use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; use crate::resources; -use crate::resources::CoreResource; +use crate::resources::CliResource; use crate::resources::Resource; use crate::state::ThreadSafeState; use deno::*; @@ -132,13 +132,13 @@ fn op_accept( .and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); - Ok((tcp_stream_resource, local_addr, remote_addr)) + let rid = resources::add_tcp_stream(tcp_stream); + Ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) - .and_then(move |(tcp_stream_resource, local_addr, remote_addr)| { + .and_then(move |(rid, local_addr, remote_addr)| { futures::future::ok(json!({ - "rid": tcp_stream_resource.rid, + "rid": rid, "localAddr": local_addr.to_string(), "remoteAddr": remote_addr.to_string(), })) @@ -170,13 +170,13 @@ fn op_dial( .and_then(move |tcp_stream| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); - Ok((tcp_stream_resource, local_addr, remote_addr)) + let rid = resources::add_tcp_stream(tcp_stream); + Ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) - .and_then(move |(tcp_stream_resource, local_addr, remote_addr)| { + .and_then(move |(rid, local_addr, remote_addr)| { futures::future::ok(json!({ - "rid": tcp_stream_resource.rid, + "rid": rid, "localAddr": local_addr.to_string(), "remoteAddr": remote_addr.to_string(), })) @@ -201,7 +201,6 @@ fn op_shutdown( let rid = args.rid as u32; let how = args.how; - let mut resource = resources::lookup(rid)?; let shutdown_mode = match how { 0 => Shutdown::Read, @@ -209,8 +208,15 @@ fn op_shutdown( _ => unimplemented!(), }; - // Use UFCS for disambiguation - Resource::shutdown(&mut resource, shutdown_mode)?; + let mut table = resources::lock_resource_table(); + let resource = table.get_mut::(rid).ok_or_else(bad_resource)?; + match resource { + CliResource::TcpStream(ref mut stream) => { + TcpStream::shutdown(stream, shutdown_mode).map_err(ErrBox::from)?; + } + _ => return Err(bad_resource()), + } + Ok(JsonOp::Sync(json!({}))) } @@ -228,7 +234,7 @@ struct TcpListenerResource { local_addr: SocketAddr, } -impl CoreResource for TcpListenerResource {} +impl Resource for TcpListenerResource {} impl Drop for TcpListenerResource { fn drop(&mut self) { diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs index ba63c51095..723fb25718 100644 --- a/cli/ops/repl.rs +++ b/cli/ops/repl.rs @@ -5,7 +5,7 @@ use crate::ops::json_op; use crate::repl; use crate::repl::Repl; use crate::resources; -use crate::resources::CoreResource; +use crate::resources::Resource; use crate::state::ThreadSafeState; use deno::*; use std::sync::Arc; @@ -24,7 +24,7 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { struct ReplResource(Arc>); -impl CoreResource for ReplResource {} +impl Resource for ReplResource {} #[derive(Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index ee08f357a7..6e8348c915 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -6,7 +6,7 @@ use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; use crate::resources; -use crate::resources::CoreResource; +use crate::resources::Resource; use crate::state::ThreadSafeState; use deno::*; use futures::Async; @@ -99,9 +99,9 @@ pub fn op_dial_tls( .connect(dnsname, tcp_stream) .map_err(ErrBox::from) .and_then(move |tls_stream| { - let tls_stream_resource = resources::add_tls_stream(tls_stream); + let rid = resources::add_tls_stream(tls_stream); futures::future::ok(json!({ - "rid": tls_stream_resource.rid, + "rid": rid, "localAddr": local_addr.to_string(), "remoteAddr": remote_addr.to_string(), })) @@ -179,7 +179,7 @@ pub struct TlsListenerResource { local_addr: SocketAddr, } -impl CoreResource for TlsListenerResource {} +impl Resource for TlsListenerResource {} impl Drop for TlsListenerResource { fn drop(&mut self) { @@ -389,14 +389,13 @@ fn op_accept_tls( .accept(tcp_stream) .map_err(ErrBox::from) .and_then(move |tls_stream| { - let tls_stream_resource = - resources::add_server_tls_stream(tls_stream); - Ok((tls_stream_resource, local_addr, remote_addr)) + let rid = resources::add_server_tls_stream(tls_stream); + Ok((rid, local_addr, remote_addr)) }) }) - .and_then(move |(tls_stream_resource, local_addr, remote_addr)| { + .and_then(move |(rid, local_addr, remote_addr)| { futures::future::ok(json!({ - "rid": tls_stream_resource.rid, + "rid": rid, "localAddr": local_addr.to_string(), "remoteAddr": remote_addr.to_string(), })) diff --git a/cli/resources.rs b/cli/resources.rs index 3b070e06b6..2e6306761b 100644 --- a/cli/resources.rs +++ b/cli/resources.rs @@ -8,11 +8,10 @@ // descriptors". This module implements a global resource table. Ops (AKA // handlers) look up resources by their integer id here. -use crate::deno_error; use crate::deno_error::bad_resource; use crate::http_body::HttpBody; use deno::ErrBox; -pub use deno::Resource as CoreResource; +pub use deno::Resource; pub use deno::ResourceId; use deno::ResourceTable; @@ -21,8 +20,6 @@ use futures::Future; use futures::Poll; use reqwest::r#async::Decoder as ReqwestDecoder; use std; -use std::io::{Read, Seek, SeekFrom, Write}; -use std::net::Shutdown; use std::process::ExitStatus; use std::sync::Mutex; use std::sync::MutexGuard; @@ -66,7 +63,7 @@ lazy_static! { } // TODO: move listeners out of this enum and rename to `StreamResource` -enum CliResource { +pub enum CliResource { Stdin(tokio::io::Stdin), Stdout(tokio::fs::File), Stderr(tokio::io::Stderr), @@ -84,60 +81,21 @@ enum CliResource { ChildStderr(tokio_process::ChildStderr), } -impl CoreResource for CliResource {} +impl Resource for CliResource {} pub fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> { RESOURCE_TABLE.lock().unwrap() } -// Abstract async file interface. -// Ideally in unix, if Resource represents an OS rid, it will be the same. -#[derive(Clone, Debug)] -pub struct Resource { - pub rid: ResourceId, -} - -impl Resource { - // close(2) is done by dropping the value. Therefore we just need to remove - // the resource from the RESOURCE_TABLE. - pub fn close(&self) { - let mut table = lock_resource_table(); - table.close(self.rid).unwrap(); - } - - pub fn shutdown(&mut self, how: Shutdown) -> Result<(), ErrBox> { - let mut table = lock_resource_table(); - let repr = table - .get_mut::(self.rid) - .ok_or_else(bad_resource)?; - - match repr { - CliResource::TcpStream(ref mut f) => { - TcpStream::shutdown(f, how).map_err(ErrBox::from) - } - _ => Err(bad_resource()), - } - } -} - -impl Read for Resource { - fn read(&mut self, _buf: &mut [u8]) -> std::io::Result { - unimplemented!(); - } -} - /// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait /// but uses an `ErrBox` error instead of `std::io:Error` pub trait DenoAsyncRead { fn poll_read(&mut self, buf: &mut [u8]) -> Poll; } -impl DenoAsyncRead for Resource { +impl DenoAsyncRead for CliResource { fn poll_read(&mut self, buf: &mut [u8]) -> Poll { - let mut table = lock_resource_table(); - let repr = table.get_mut(self.rid).ok_or_else(bad_resource)?; - - let r = match repr { + let r = match self { CliResource::FsFile(ref mut f) => f.poll_read(buf), CliResource::Stdin(ref mut f) => f.poll_read(buf), CliResource::TcpStream(ref mut f) => f.poll_read(buf), @@ -155,16 +113,6 @@ impl DenoAsyncRead for Resource { } } -impl Write for Resource { - fn write(&mut self, _buf: &[u8]) -> std::io::Result { - unimplemented!() - } - - fn flush(&mut self) -> std::io::Result<()> { - unimplemented!() - } -} - /// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait /// but uses an `ErrBox` error instead of `std::io:Error` pub trait DenoAsyncWrite { @@ -173,14 +121,9 @@ pub trait DenoAsyncWrite { fn shutdown(&mut self) -> Poll<(), ErrBox>; } -impl DenoAsyncWrite for Resource { +impl DenoAsyncWrite for CliResource { fn poll_write(&mut self, buf: &[u8]) -> Poll { - let mut table = lock_resource_table(); - let repr = table - .get_mut::(self.rid) - .ok_or_else(bad_resource)?; - - let r = match repr { + let r = match self { CliResource::FsFile(ref mut f) => f.poll_write(buf), CliResource::Stdout(ref mut f) => f.poll_write(buf), CliResource::Stderr(ref mut f) => f.poll_write(buf), @@ -201,41 +144,36 @@ impl DenoAsyncWrite for Resource { } } -pub fn add_fs_file(fs_file: tokio::fs::File) -> Resource { +pub fn add_fs_file(fs_file: tokio::fs::File) -> ResourceId { let mut table = lock_resource_table(); - let rid = table.add("fsFile", Box::new(CliResource::FsFile(fs_file))); - Resource { rid } + table.add("fsFile", Box::new(CliResource::FsFile(fs_file))) } -pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> Resource { +pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> ResourceId { let mut table = lock_resource_table(); - let rid = table.add("tcpStream", Box::new(CliResource::TcpStream(stream))); - Resource { rid } + table.add("tcpStream", Box::new(CliResource::TcpStream(stream))) } -pub fn add_tls_stream(stream: ClientTlsStream) -> Resource { +pub fn add_tls_stream(stream: ClientTlsStream) -> ResourceId { let mut table = lock_resource_table(); - let rid = table.add( + table.add( "clientTlsStream", Box::new(CliResource::ClientTlsStream(Box::new(stream))), - ); - Resource { rid } + ) } -pub fn add_server_tls_stream(stream: ServerTlsStream) -> Resource { +pub fn add_server_tls_stream(stream: ServerTlsStream) -> ResourceId { let mut table = lock_resource_table(); - let rid = table.add( + table.add( "serverTlsStream", Box::new(CliResource::ServerTlsStream(Box::new(stream))), - ); - Resource { rid } + ) } -pub fn add_reqwest_body(body: ReqwestDecoder) -> Resource { +pub fn add_reqwest_body(body: ReqwestDecoder) -> ResourceId { let body = HttpBody::from(body); let mut table = lock_resource_table(); - let rid = table.add("httpBody", Box::new(CliResource::HttpBody(body))); - Resource { rid } + table.add("httpBody", Box::new(CliResource::HttpBody(body))) } pub struct ChildResources { @@ -347,40 +285,3 @@ pub fn get_file(rid: ResourceId) -> Result { _ => Err(bad_resource()), } } - -pub fn lookup(rid: ResourceId) -> Result { - debug!("resource lookup {}", rid); - let table = lock_resource_table(); - let _ = table.get::(rid).ok_or_else(bad_resource)?; - Ok(Resource { rid }) -} - -pub fn seek( - resource: Resource, - offset: i32, - whence: u32, -) -> Box + Send> { - // Translate seek mode to Rust repr. - let seek_from = match whence { - 0 => SeekFrom::Start(offset as u64), - 1 => SeekFrom::Current(i64::from(offset)), - 2 => SeekFrom::End(i64::from(offset)), - _ => { - return Box::new(futures::future::err( - deno_error::DenoError::new( - deno_error::ErrorKind::InvalidSeekMode, - format!("Invalid seek mode: {}", whence), - ) - .into(), - )); - } - }; - - match get_file(resource.rid) { - Ok(mut file) => Box::new(futures::future::lazy(move || { - let result = file.seek(seek_from).map(|_| {}).map_err(ErrBox::from); - futures::future::result(result) - })), - Err(err) => Box::new(futures::future::err(err)), - } -} diff --git a/cli/tokio_read.rs b/cli/tokio_read.rs deleted file mode 100644 index 25c4df1918..0000000000 --- a/cli/tokio_read.rs +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (c) 2019 Tokio Contributors. All rights reserved. MIT license. -// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -// Forked from: https://github.com/tokio-rs/tokio/blob/9b3f8564af4bb1aee07fab3c401eb412ca5eeac5/tokio-io/src/io/read.rs -use crate::resources::DenoAsyncRead; -use deno::ErrBox; -use futures::{Future, Poll}; -use std::mem; - -/// This is almost the same implementation as in tokio, the only difference is -/// that error type is `ErrBox` instead of `std::io::Error`. - -#[derive(Debug)] -enum State { - Pending { rd: R, buf: T }, - Empty, -} - -/// Tries to read some bytes directly into the given `buf` in asynchronous -/// manner, returning a future type. -/// -/// The returned future will resolve to both the I/O stream and the buffer -/// as well as the number of bytes read once the read operation is completed. -pub fn read(rd: R, buf: T) -> Read -where - R: DenoAsyncRead, - T: AsMut<[u8]>, -{ - Read { - state: State::Pending { rd, buf }, - } -} - -/// A future which can be used to easily read available number of bytes to fill -/// a buffer. -/// -/// Created by the [`read`] function. -#[derive(Debug)] -pub struct Read { - state: State, -} - -impl Future for Read -where - R: DenoAsyncRead, - T: AsMut<[u8]>, -{ - type Item = (R, T, usize); - type Error = ErrBox; - - fn poll(&mut self) -> Poll<(R, T, usize), ErrBox> { - let nread = match self.state { - State::Pending { - ref mut rd, - ref mut buf, - } => try_ready!(rd.poll_read(&mut buf.as_mut()[..])), - State::Empty => panic!("poll a Read after it's done"), - }; - - match mem::replace(&mut self.state, State::Empty) { - State::Pending { rd, buf } => Ok((rd, buf, nread).into()), - State::Empty => panic!("invalid internal state"), - } - } -} diff --git a/cli/tokio_write.rs b/cli/tokio_write.rs deleted file mode 100644 index 31b4cda307..0000000000 --- a/cli/tokio_write.rs +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::resources::DenoAsyncWrite; -use deno::ErrBox; -use futures::{Future, Poll}; -use std::mem; - -/// A future used to write some data to a stream. -/// -/// This is created by the [`write`] top-level method. -/// -/// [`write`]: fn.write.html -#[derive(Debug)] -pub struct Write { - state: State, -} - -#[derive(Debug)] -enum State { - Pending { a: A, buf: T }, - Empty, -} - -/// Creates a future that will write some of the buffer `buf` to -/// the stream `a` provided. -/// -/// Any error which happens during writing will cause both the stream and the -/// buffer to get destroyed. -pub fn write(a: A, buf: T) -> Write -where - A: DenoAsyncWrite, - T: AsRef<[u8]>, -{ - Write { - state: State::Pending { a, buf }, - } -} - -/// This is almost the same implementation as in tokio, difference is -/// that error type is `ErrBox` instead of `std::io::Error`. -impl Future for Write -where - A: DenoAsyncWrite, - T: AsRef<[u8]>, -{ - type Item = (A, T, usize); - type Error = ErrBox; - - fn poll(&mut self) -> Poll<(A, T, usize), ErrBox> { - let nwritten = match self.state { - State::Pending { - ref mut a, - ref mut buf, - } => try_ready!(a.poll_write(buf.as_ref())), - State::Empty => panic!("poll a Read after it's done"), - }; - - match mem::replace(&mut self.state, State::Empty) { - State::Pending { a, buf } => Ok((a, buf, nwritten).into()), - State::Empty => panic!("invalid internal state"), - } - } -} diff --git a/cli/worker.rs b/cli/worker.rs index 6b49700156..eeda364c97 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -3,7 +3,7 @@ use crate::deno_error::bad_resource; use crate::fmt_errors::JSError; use crate::ops; use crate::resources; -use crate::resources::CoreResource; +use crate::resources::Resource; use crate::resources::ResourceId; use crate::state::ThreadSafeState; use deno; @@ -31,7 +31,7 @@ pub struct WorkerChannels { pub receiver: mpsc::Receiver, } -impl CoreResource for WorkerChannels {} +impl Resource for WorkerChannels {} /// Wraps deno::Isolate to provide source maps, ops for the CLI, and /// high-level module loading. @@ -374,12 +374,9 @@ mod tests { worker.execute(source).unwrap(); let worker_ = worker.clone(); - let rid = worker.state.rid; - let resource_ = resources::Resource { rid }; tokio::spawn(lazy(move || { worker.then(move |r| -> Result<(), ()> { - resource_.close(); r.unwrap(); Ok(()) }) @@ -413,12 +410,10 @@ mod tests { .unwrap(); let rid = worker.state.rid; - let resource = resources::Resource { rid }; let worker_ = worker.clone(); let worker_future = worker .then(move |r| -> Result<(), ()> { - resource.close(); println!("workers.rs after resource close"); r.unwrap(); Ok(())